You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/26 02:41:51 UTC

[1/4] incubator-impala git commit: IMPALA-3501: ee tests: detect build type and support different timeouts based on the same

Repository: incubator-impala
Updated Branches:
  refs/heads/master f09c6311c -> 22669e23b


IMPALA-3501: ee tests: detect build type and support different timeouts based on the same

Impala compiled with the address sanitizer, or compiled with code
coverage, runs through code paths much slower. This can cause end-to-end
tests that pass on a non-ASAN or non-code coverage build to fail. Some
examples include IMPALA-2721, IMPALA-2973, and IMPALA-3501. These
classes of failures tend always to involve some time-sensitive condition
that fails to succeed under such "slow builds".

The works-around in the past have been to simply increase the timeout.
The problem with this approach is that it relaxes conditions for tests
on builds that see the field--i.e., release builds--for builds that
never will--i.e., ASAN and code coverage.

This patch fixes that problem by allowing test authors to set timeout
values based on a *specific* build type. The author may choose timeouts
with a default value, and different timeouts for either or both
so-called "slow builds": ASAN and code coverage.

We detect the so-called "specific build type" by inspecting the binary
expected to be at the path under test. This removes the need to make
alterations to Impala itself. The inspection done is to read the DWARF
information in the binary, specifically the first compile unit's
DW_AT_producer and DW_AT_name DIE attributes. We employ a heuristic
based on these attributes' values to guess the build type. If we can't
determine the build type, we will assume it's a debug build. More
information on this is in IMPALA-3501.

A quick summary of the changes follows:

1. Move some of the logic in tests.common.skip to tests.common.environ
   and rework some skip marks to be more precise.

2. Add Pyelftools for convenient deserialization of DWARF

3. Our Pyelftools usage requires collections.OrderedDict, which isn't in
   python2.6; also add Monkeypatch to handle this.

4. Add ImpalaBuild and specific_build_type_timeout, the core of the new
   functionality

5. Fix the statestore tests that only fail under code coverage (the
   basis for IMPALA-3501)

Testing:

The tests that were previously, reliably failing under code coverage now
pass. I also ran perfunctory tests of debug, release, and ASAN builds to
ensure our detection of build type is working. This patch will *not*
turn the code coverage builds green; there are other tests that fail,
and fixing all of them here is out of the scope of this patch.

Change-Id: I2b675c04c54e36d404fd9e5a6cf085fb8d6d0e47
Reviewed-on: http://gerrit.cloudera.org:8080/3156
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/22669e23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/22669e23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/22669e23

Branch: refs/heads/master
Commit: 22669e23bef947ea155156e1af85aadc2c64e64a
Parents: 6198d92
Author: Michael Brown <mi...@cloudera.com>
Authored: Mon May 9 14:15:39 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed May 25 19:41:45 2016 -0700

----------------------------------------------------------------------
 infra/python/deps/requirements.txt           |   2 +
 tests/common/environ.py                      | 246 ++++++++++++++++++++++
 tests/common/skip.py                         |  37 +---
 tests/custom_cluster/test_alloc_fail.py      |   4 +-
 tests/custom_cluster/test_exchange_delays.py |   4 +-
 tests/statestore/test_statestore.py          |   8 +-
 6 files changed, 267 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index 5409027..9ffafc6 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -36,11 +36,13 @@ impyla == 0.11.2
   # before thirdparty is built thrift will be installed anyways.
   thrift == 0.9.0
   thrift_sasl == 0.1.0
+monkeypatch == 0.1rc3
 ordereddict == 1.1
 pexpect == 3.3
 pg8000 == 1.10.2
 prettytable == 0.7.2
 psutil == 0.7.1
+pyelftools == 0.23
 pyparsing == 2.0.3
 pytest == 2.7.2
   py == 1.4.30

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/tests/common/environ.py
----------------------------------------------------------------------
diff --git a/tests/common/environ.py b/tests/common/environ.py
new file mode 100644
index 0000000..db487e8
--- /dev/null
+++ b/tests/common/environ.py
@@ -0,0 +1,246 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import re
+
+try:
+  from elftools.elf.elffile import ELFFile
+except ImportError as e:
+  # Handle pre-python2.7s' lack of collections.OrderedDict, which we include in
+  # impala-python as ordereddict.OrderedDict.
+  if 'cannot import name OrderedDict' == str(e):
+    import monkeypatch
+    from ordereddict import OrderedDict
+    monkeypatch.patch(OrderedDict, 'collections', 'OrderedDict')
+    from elftools.elf.elffile import ELFFile
+  else:
+    raise e
+
+
+LOG = logging.getLogger('tests.common.environ')
+
+
+# See if Impala is running with legacy aggregations and/or hash joins. This is kind of a
+# hack. It would be better to poll Impala whether it is doing so.
+test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "")
+old_agg_regex = "enable_partitioned_aggregation=false"
+old_hash_join_regex = "enable_partitioned_hash_join=false"
+USING_OLD_AGGS_JOINS = re.search(old_agg_regex, test_start_cluster_args) is not None or \
+    re.search(old_hash_join_regex, test_start_cluster_args) is not None
+
+# Find the likely BuildType of the running Impala. Assume it's found through the path
+# $IMPALA_HOME/be/build/latest as a fallback.
+impala_home = os.environ.get("IMPALA_HOME", "")
+build_type_arg_regex = re.compile(r'--build_type=(\w+)', re.I)
+build_type_arg_search_result = re.search(build_type_arg_regex, test_start_cluster_args)
+
+if build_type_arg_search_result is not None:
+  build_type_dir = build_type_search_result.groups()[0].lower()
+else:
+  build_type_dir = 'latest'
+
+# Resolve any symlinks in the path.
+impalad_basedir = \
+    os.path.realpath(os.path.join(impala_home, 'be/build', build_type_dir)).rstrip('/')
+
+IMPALAD_PATH = os.path.join(impalad_basedir, 'service', 'impalad')
+
+
+class SpecificImpaladBuildTypes:
+  """
+  Represent a specific build type. In reality, there 5 specific build types. These
+  specific build types are needed by Python test code.
+
+  The specific build types and their *most distinguishing* compiler options are:
+
+  1. ADDRESS_SANITIZER (clang -fsanitize=address)
+  2. DEBUG (gcc -ggdb)
+  3. DEBUG_CODE_COVERAGE (gcc -ggdb -ftest-coverage)
+  4. RELEASE (gcc)
+  5. RELEASE_CODE_COVERAGE (gcc -ftest-coverage)
+  """
+  # ./buildall.sh -asan
+  ADDRESS_SANITIZER = 'address_sanitizer'
+  # ./buildall.sh
+  DEBUG = 'debug'
+  # ./buildall.sh -codecoverage
+  DEBUG_CODE_COVERAGE = 'debug_code_coverage'
+  # ./buildall.sh -release
+  RELEASE = 'release'
+  # ./buildall.sh -release -codecoverage
+  RELEASE_CODE_COVERAGE = 'release_code_coverage'
+
+
+class ImpaladBuild(object):
+  """
+  Acquires and provides characteristics about the way the Impala under test was compiled
+  and its likely effects on its responsiveness to automated test timings.
+  """
+  def __init__(self, impalad_path):
+    self.impalad_path = impalad_path
+    die_name, die_producer = self._get_impalad_dwarf_info()
+    self._set_impalad_build_type(die_name, die_producer)
+
+  @property
+  def specific_build_type(self):
+    """
+    Return the correct SpecificImpaladBuildTypes for the Impala under test.
+    """
+    return self._specific_build_type
+
+  def has_code_coverage(self):
+    """
+    Return whether the Impala under test was compiled with code coverage enabled.
+    """
+    return self.specific_build_type in (SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE,
+                                        SpecificImpaladBuildTypes.RELEASE_CODE_COVERAGE)
+
+  def is_asan(self):
+    """
+    Return whether the Impala under test was compiled with ASAN.
+    """
+    return self.specific_build_type == SpecificImpaladBuildTypes.ADDRESS_SANITIZER
+
+  def is_dev(self):
+    """
+    Return whether the Impala under test is a development build (i.e., any debug or ASAN
+    build).
+    """
+    return self.specific_build_type in (
+        SpecificImpaladBuildTypes.ADDRESS_SANITIZER, SpecificImpaladBuildTypes.DEBUG,
+        SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE)
+
+  def runs_slowly(self):
+    """
+    Return whether the Impala under test "runs slowly". For our purposes this means
+    either compiled with code coverage enabled or ASAN.
+    """
+    return self.has_code_coverage() or self.is_asan()
+
+  def _get_impalad_dwarf_info(self):
+    """
+    Read the impalad_path ELF binary, which is supposed to contain DWARF, and read the
+    DWARF to understand the compiler options. Return a 2-tuple of the two useful DIE
+    attributes of the first compile unit: the DW_AT_name and DW_AT_producer. If
+    something goes wrong doing this, log a warning and return nothing.
+    """
+    # Some useful references:
+    # - be/CMakeLists.txt
+    # - gcc(1), especially -grecord-gcc-switches, -g, -ggdb, -gdwarf-2
+    # - readelf(1)
+    # - general reading about DWARF
+    # A useful command for exploration without having to wade through many bytes is:
+    # readelf --debug-dump=info --dwarf-depth=1 impalad
+    # The DWARF lines are long, raw, and nasty; I'm hesitant to paste them here, so
+    # curious readers are highly encouraged to try the above, or read IMPALA-3501.
+    die_name = None
+    die_producer = None
+    try:
+      with open(self.impalad_path, 'rb') as fh:
+        impalad_elf = ELFFile(fh)
+        if impalad_elf.has_dwarf_info():
+          dwarf_info = impalad_elf.get_dwarf_info()
+          # We only need the first CU, hence the unconventional use of the iterator
+          # protocol.
+          cu_iterator = dwarf_info.iter_CUs()
+          first_cu = next(cu_iterator)
+          top_die = first_cu.get_top_DIE()
+          die_name = top_die.attributes['DW_AT_name'].value
+          die_producer = top_die.attributes['DW_AT_producer'].value
+    except Exception as e:
+      LOG.warn('Failure to read DWARF info from {0}: {1}'.format(self.impalad_path,
+                                                                 str(e)))
+    return die_name, die_producer
+
+  def _set_impalad_build_type(self, die_name, die_producer):
+    """
+    Use a heuristic based on the DW_AT_producer and DW_AT_name of the first compile
+    unit, as returned by _get_impalad_dwarf_info(), to figure out which of 5 supported
+    builds of impalad we're dealing with. If the heuristic can't determine, fall back to
+    assuming a debug build and log a warning.
+    """
+    ASAN_CU_NAME = 'asan_preinit.cc'
+    NON_ASAN_CU_NAME = 'daemon-main.cc'
+    GDB_FLAG = '-ggdb'
+    CODE_COVERAGE_FLAG = '-ftest-coverage'
+
+    if die_name is None or die_producer is None:
+      LOG.warn('Not enough DWARF info in {0} to determine build type; choosing '
+               'DEBUG'.format(self.impalad_path))
+      self._specific_build_type = SpecificImpaladBuildTypes.DEBUG
+      return
+
+    is_debug = GDB_FLAG in die_producer
+    specific_build_type = SpecificImpaladBuildTypes.DEBUG
+
+    if die_name.endswith(ASAN_CU_NAME):
+      specific_build_type = SpecificImpaladBuildTypes.ADDRESS_SANITIZER
+    elif not die_name.endswith(NON_ASAN_CU_NAME):
+      LOG.warn('Unexpected DW_AT_name in first CU: {0}; choosing '
+               'DEBUG'.format(die_name))
+      specific_build_type = SpecificImpaladBuildTypes.DEBUG
+    elif CODE_COVERAGE_FLAG in die_producer:
+      if is_debug:
+        specific_build_type = SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE
+      else:
+        specific_build_type = SpecificImpaladBuildTypes.RELEASE_CODE_COVERAGE
+    else:
+      if is_debug:
+        specific_build_type = SpecificImpaladBuildTypes.DEBUG
+      else:
+        specific_build_type = SpecificImpaladBuildTypes.RELEASE
+
+    self._specific_build_type = specific_build_type
+
+
+IMPALAD_BUILD = ImpaladBuild(IMPALAD_PATH)
+
+
+def specific_build_type_timeout(
+    default_timeout, slow_build_timeout=None, asan_build_timeout=None,
+    code_coverage_build_timeout=None):
+  """
+  Return a test environment-specific timeout based on the sort of
+  SpecificImpalaBuildType under test.
+
+  Required parameter: default_timeout - default timeout value. This applies when Impala is
+  a standard release or debug build, or if no other timeouts are specified.
+
+  Optional parameters:
+  slow_build_timeout - timeout to use if we're running against *any* build known to be
+  slow. If specified, this will preempt default_timeout if Impala is expected to be
+  "slow". You can use this as a shorthand in lieu of specifying all of the following
+  parameters.
+
+  The parameters below correspond to specific build types. These preempt both
+  slow_build_timeout and default_timeout, if the Impala under test is a build of the
+  applicable type:
+
+  asan_build_timeout - timeout to use if Impala with ASAN is running
+
+  code_coverage_build_timeout - timeout to use if Impala with code coverage is running
+  (both debug and release code coverage)
+  """
+
+  if IMPALAD_BUILD.is_asan() and asan_build_timeout is not None:
+    timeout_val = asan_build_timeout
+  elif IMPALAD_BUILD.has_code_coverage() and code_coverage_build_timeout is not None:
+    timeout_val = code_coverage_build_timeout
+  elif IMPALAD_BUILD.runs_slowly() and slow_build_timeout is not None:
+    timeout_val = slow_build_timeout
+  else:
+    timeout_val = default_timeout
+  return timeout_val

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index b2f52ba..e260eae 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -21,6 +21,8 @@ import re
 import os
 import pytest
 from functools import partial
+
+from tests.common.environ import IMPALAD_BUILD, USING_OLD_AGGS_JOINS
 from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCAL
 
 
@@ -66,22 +68,14 @@ class SkipIfIsilon:
       reason="This Isilon issue has yet to be triaged.")
   jira = partial(pytest.mark.skipif, IS_ISILON)
 
-# TODO: looking at TEST_START_CLUSTER_ARGS is a hack. It would be better to add an option
-# to pytest.
-test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS","")
-old_agg_regex = "enable_partitioned_aggregation=false"
-old_hash_join_regex = "enable_partitioned_hash_join=false"
-using_old_aggs_joins = re.search(old_agg_regex, test_start_cluster_args) is not None or \
-    re.search(old_hash_join_regex, test_start_cluster_args) is not None
-
 class SkipIfOldAggsJoins:
-  nested_types = pytest.mark.skipif(using_old_aggs_joins,
+  nested_types = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
       reason="Nested types not supported with old aggs and joins")
-  passthrough_preagg = pytest.mark.skipif(using_old_aggs_joins,
+  passthrough_preagg = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
       reason="Passthrough optimization not implemented by old agg")
-  unsupported = pytest.mark.skipif(using_old_aggs_joins,
+  unsupported = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
       reason="Query unsupported with old aggs and joins")
-  requires_spilling = pytest.mark.skipif(using_old_aggs_joins,
+  requires_spilling = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
       reason="Test case requires spilling to pass")
 
 class SkipIfLocal:
@@ -111,21 +105,6 @@ class SkipIfLocal:
   root_path = pytest.mark.skipif(IS_LOCAL,
       reason="Tests rely on the root directory")
 
-# Try to derive the build type. Assume it's 'latest' by default.
-impala_home = os.environ.get("IMPALA_HOME", "")
-build_type_regex = re.compile(r'--build_type=(\w+)', re.I)
-build_type_search_result = re.search(build_type_regex, test_start_cluster_args)
-
-if build_type_search_result is not None:
-  build_type = build_type_search_result.groups()[0].lower()
-else:
-  build_type = 'latest'
-
-# Resolve any symlinks in the path.
-impalad_basedir = \
-    os.path.realpath(os.path.join(impala_home, 'be/build', build_type)).rstrip('/')
-debug_build = os.path.basename(impalad_basedir).lower() == 'debug'
-
-class SkipIfNotDebugBuild:
-  debug_only = pytest.mark.skipif(not debug_build,
+class SkipIfBuildType:
+  not_dev_build = pytest.mark.skipif(not IMPALAD_BUILD.is_dev(),
       reason="Tests depends on debug build startup option.")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/tests/custom_cluster/test_alloc_fail.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_alloc_fail.py b/tests/custom_cluster/test_alloc_fail.py
index d9b9f78..9021089 100644
--- a/tests/custom_cluster/test_alloc_fail.py
+++ b/tests/custom_cluster/test_alloc_fail.py
@@ -16,9 +16,9 @@ import logging
 import pytest
 from copy import deepcopy
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfNotDebugBuild
+from tests.common.skip import SkipIfBuildType
 
-@SkipIfNotDebugBuild.debug_only
+@SkipIfBuildType.not_dev_build
 class TestAllocFail(CustomClusterTestSuite):
   """Tests for handling malloc() failure for UDF/UDA"""
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/tests/custom_cluster/test_exchange_delays.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_exchange_delays.py b/tests/custom_cluster/test_exchange_delays.py
index 9c96801..147c956 100644
--- a/tests/custom_cluster/test_exchange_delays.py
+++ b/tests/custom_cluster/test_exchange_delays.py
@@ -14,9 +14,9 @@
 
 import pytest
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfNotDebugBuild
+from tests.common.skip import SkipIfBuildType
 
-@SkipIfNotDebugBuild.debug_only
+@SkipIfBuildType.not_dev_build
 class TestExchangeDelays(CustomClusterTestSuite):
   """Tests for handling delays in finding data stream receivers"""
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/22669e23/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index c1724b0..49a8bc1 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -21,6 +21,8 @@ import uuid
 import urllib2
 import json
 
+from tests.common.environ import specific_build_type_timeout
+
 from thrift.transport import TSocket
 from thrift.transport import TTransport
 from thrift.protocol import TBinaryProtocol
@@ -59,6 +61,9 @@ STATUS_OK = TStatus(TErrorCode.OK)
 DEFAULT_UPDATE_STATE_RESPONSE = TUpdateStateResponse(status=STATUS_OK, topic_updates=[],
                                                      skipped=False)
 
+# IMPALA-3501: the timeout needs to be higher in code coverage builds
+WAIT_FOR_FAILURE_TIMEOUT = specific_build_type_timeout(40, code_coverage_build_timeout=60)
+
 class WildcardServerSocket(TSocket.TSocketBase, TTransport.TServerTransportBase):
   """Specialised server socket that binds to a random port at construction"""
   def __init__(self, host=None, port=0):
@@ -292,7 +297,8 @@ class StatestoreSubscriber(object):
     finally:
       self.update_event.release()
 
-  def wait_for_failure(self, timeout=40):
+
+  def wait_for_failure(self, timeout=WAIT_FOR_FAILURE_TIMEOUT):
     """Waits until this subscriber no longer appears in the statestore's subscriber
     list. If 'timeout' seconds pass, throws an exception."""
     start = time.time()


[3/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv dependencies

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index f1e71b2..c01f34b 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -18,7 +18,7 @@
 
 using namespace impala;
 
-void DiskIoMgr::RequestContext::Cancel(const Status& status) {
+void DiskIoRequestContext::Cancel(const Status& status) {
   DCHECK(!status.ok());
 
   // Callbacks are collected in this vector and invoked while no lock is held.
@@ -28,18 +28,18 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
     DCHECK(Validate()) << endl << DebugString();
 
     // Already being cancelled
-    if (state_ == RequestContext::Cancelled) return;
+    if (state_ == DiskIoRequestContext::Cancelled) return;
 
     DCHECK(status_.ok());
     status_ = status;
 
     // The reader will be put into a cancelled state until call cleanup is complete.
-    state_ = RequestContext::Cancelled;
+    state_ = DiskIoRequestContext::Cancelled;
 
     // Cancel all scan ranges for this reader. Each range could be one one of
     // four queues.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
+      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
       RequestRange* range = NULL;
       while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
         if (range->request_type() == RequestType::READ) {
@@ -74,7 +74,7 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
     // Schedule reader on all disks. The disks will notice it is cancelled and do any
     // required cleanup
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
+      DiskIoRequestContext::PerDiskState& state = disk_states_[i];
       state.ScheduleContext(this, i);
     }
   }
@@ -88,10 +88,10 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
   ready_to_start_ranges_cv_.notify_all();
 }
 
-void DiskIoMgr::RequestContext::AddRequestRange(
+void DiskIoRequestContext::AddRequestRange(
     DiskIoMgr::RequestRange* range, bool schedule_immediately) {
   // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
   if (state.done()) {
     DCHECK_EQ(state.num_remaining_ranges(), 0);
     state.set_done(false);
@@ -107,7 +107,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
       state.unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
     }
-    // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
+    // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will
     // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
     // invoked.
     schedule_context = state.next_scan_range_to_start() == NULL;
@@ -126,7 +126,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
   ++state.num_remaining_ranges();
 }
 
-DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks)
   : parent_(parent),
     bytes_read_counter_(NULL),
     read_timer_(NULL),
@@ -137,7 +137,7 @@ DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
 }
 
 // Resets this object.
-void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
+void DiskIoRequestContext::Reset(MemTracker* tracker) {
   DCHECK_EQ(state_, Inactive);
   status_ = Status::OK();
 
@@ -173,13 +173,13 @@ void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
 }
 
 // Dumps out request context information. Lock should be taken by caller
-string DiskIoMgr::RequestContext::DebugString() const {
+string DiskIoRequestContext::DebugString() const {
   stringstream ss;
-  ss << endl << "  RequestContext: " << (void*)this << " (state=";
-  if (state_ == RequestContext::Inactive) ss << "Inactive";
-  if (state_ == RequestContext::Cancelled) ss << "Cancelled";
-  if (state_ == RequestContext::Active) ss << "Active";
-  if (state_ != RequestContext::Inactive) {
+  ss << endl << "  DiskIoRequestContext: " << (void*)this << " (state=";
+  if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
+  if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
+  if (state_ == DiskIoRequestContext::Active) ss << "Active";
+  if (state_ != DiskIoRequestContext::Inactive) {
     ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
        << " #ready_buffers=" << num_ready_buffers_.Load()
        << " #used_buffers=" << num_used_buffers_.Load()
@@ -203,9 +203,9 @@ string DiskIoMgr::RequestContext::DebugString() const {
   return ss.str();
 }
 
-bool DiskIoMgr::RequestContext::Validate() const {
-  if (state_ == RequestContext::Inactive) {
-    LOG(WARNING) << "state_ == RequestContext::Inactive";
+bool DiskIoRequestContext::Validate() const {
+  if (state_ == DiskIoRequestContext::Inactive) {
+    LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
     return false;
   }
 
@@ -234,7 +234,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
       return false;
     }
 
-    if (state_ != RequestContext::Cancelled) {
+    if (state_ != DiskIoRequestContext::Cancelled) {
       if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
           state.num_remaining_ranges()) {
         LOG(WARNING) << "disk_id=" << i
@@ -285,7 +285,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
     }
   }
 
-  if (state_ != RequestContext::Cancelled) {
+  if (state_ != DiskIoRequestContext::Cancelled) {
     if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
       LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
                    << " sum_in_states=" << num_unstarted_scan_ranges_.Load();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 01ea4b5..25399bc 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -121,7 +121,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
   }
 
   DCHECK(reader_->Validate()) << endl << reader_->DebugString();
-  if (reader_->state_ == RequestContext::Cancelled) {
+  if (reader_->state_ == DiskIoRequestContext::Cancelled) {
     reader_->blocked_ranges_.Remove(this);
     Cancel(reader_->status_);
     (*buffer)->Return();
@@ -230,7 +230,7 @@ void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64
   mtime_ = mtime;
 }
 
-void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
+void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
   DCHECK(hdfs_file_ == NULL);
   io_mgr_ = io_mgr;
   reader_ = reader;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index af6ad27..0a8a628 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -52,7 +52,7 @@ string GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   int file_idx;
   vector<DiskIoMgr::ScanRange*> scan_ranges;
   int abort_at_byte;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index ee89f56..46149b5 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -40,7 +40,7 @@ namespace impala {
 class DiskIoMgrTest : public testing::Test {
  public:
   void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
-      DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, int32_t* data,
+      DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
       Status expected_status, const Status& status) {
     if (expected_status.code() == TErrorCode::CANCELLED) {
       EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
@@ -99,7 +99,7 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
-  static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+  static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
       DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) {
     DiskIoMgr::BufferDescriptor* buffer;
     ASSERT_OK(io_mgr->Read(reader, range, &buffer));
@@ -134,7 +134,7 @@ class DiskIoMgrTest : public testing::Test {
 
   // Continues pulling scan ranges from the io mgr until they are all done.
   // Updates num_ranges_processed with the number of ranges seen by this thread.
-  static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+  static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
       const char* expected_result, int expected_len, const Status& expected_status,
       int max_ranges, AtomicInt32* num_ranges_processed) {
     int num_ranges = 0;
@@ -185,14 +185,14 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoMgr::RequestContext* writer;
+      DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
         int32_t* data = pool_->Add(new int32_t);
@@ -228,7 +228,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   string tmp_file = "/tmp/non-existent.txt";
   DiskIoMgr io_mgr(1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
-  DiskIoMgr::RequestContext* writer;
+  DiskIoRequestContext* writer;
   ASSERT_OK(io_mgr.RegisterContext(&writer));
   pool_.reset(new ObjectPool);
   int32_t* data = pool_->Add(new int32_t);
@@ -238,7 +238,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-          new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+          new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
           data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
@@ -255,7 +255,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 
   new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-      new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+      new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
       data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
 
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
@@ -291,14 +291,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
-      DiskIoMgr::RequestContext* writer;
+      DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
@@ -362,7 +362,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
 
           ASSERT_OK(io_mgr.Init(&mem_tracker));
           MemTracker reader_mem_tracker;
-          DiskIoMgr::RequestContext* reader;
+          DiskIoRequestContext* reader;
           ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
           vector<DiskIoMgr::ScanRange*> ranges;
@@ -416,7 +416,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         vector<DiskIoMgr::ScanRange*> ranges_first_half;
@@ -489,7 +489,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
@@ -559,7 +559,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
-        DiskIoMgr::RequestContext* reader;
+        DiskIoRequestContext* reader;
         ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
         vector<DiskIoMgr::ScanRange*> ranges;
@@ -624,7 +624,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
-    DiskIoMgr::RequestContext* reader;
+    DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
     vector<DiskIoMgr::ScanRange*> ranges;
@@ -699,7 +699,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
-    DiskIoMgr::RequestContext* reader;
+    DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
     DiskIoMgr::ScanRange* complete_range =
@@ -764,7 +764,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   stat(file_name.c_str(), &stat_val);
 
   int64_t iters = 0;
-  vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
+  vector<DiskIoRequestContext*> contexts(num_contexts);
   Status status;
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
@@ -838,7 +838,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   vector<string> file_names;
   vector<int64_t> mtimes;
   vector<string> data;
-  vector<DiskIoMgr::RequestContext*> readers;
+  vector<DiskIoRequestContext*> readers;
   vector<char*> results;
 
   file_names.resize(NUM_READERS);
@@ -1000,7 +1000,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   MemTracker reader_mem_tracker;
-  DiskIoMgr::RequestContext* reader;
+  DiskIoRequestContext* reader;
   ASSERT_OK(io_mgr->RegisterContext(&reader, &reader_mem_tracker));
 
   // We should not read past the end of file.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 66f0498..448424f 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -104,33 +104,34 @@ DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() {
   hdfs_file_ = NULL;
 }
 
-// This class provides a cache of RequestContext objects.  RequestContexts are recycled.
-// This is good for locality as well as lock contention.  The cache has the property that
-// regardless of how many clients get added/removed, the memory locations for
-// existing clients do not change (not the case with std::vector) minimizing the locks we
-// have to take across all readers.
+// This class provides a cache of DiskIoRequestContext objects.  DiskIoRequestContexts
+// are recycled. This is good for locality as well as lock contention.  The cache has
+// the property that regardless of how many clients get added/removed, the memory
+// locations for existing clients do not change (not the case with std::vector)
+// minimizing the locks we have to take across all readers.
 // All functions on this object are thread safe
 class DiskIoMgr::RequestContextCache {
  public:
   RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
 
   // Returns a context to the cache.  This object can now be reused.
-  void ReturnContext(RequestContext* reader) {
-    DCHECK(reader->state_ != RequestContext::Inactive);
-    reader->state_ = RequestContext::Inactive;
+  void ReturnContext(DiskIoRequestContext* reader) {
+    DCHECK(reader->state_ != DiskIoRequestContext::Inactive);
+    reader->state_ = DiskIoRequestContext::Inactive;
     lock_guard<mutex> l(lock_);
     inactive_contexts_.push_back(reader);
   }
 
-  // Returns a new RequestContext object.  Allocates a new object if necessary.
-  RequestContext* GetNewContext() {
+  // Returns a new DiskIoRequestContext object.  Allocates a new object if necessary.
+  DiskIoRequestContext* GetNewContext() {
     lock_guard<mutex> l(lock_);
     if (!inactive_contexts_.empty()) {
-      RequestContext* reader = inactive_contexts_.front();
+      DiskIoRequestContext* reader = inactive_contexts_.front();
       inactive_contexts_.pop_front();
       return reader;
     } else {
-      RequestContext* reader = new RequestContext(io_mgr_, io_mgr_->num_total_disks());
+      DiskIoRequestContext* reader =
+          new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks());
       all_contexts_.push_back(reader);
       return reader;
     }
@@ -138,7 +139,7 @@ class DiskIoMgr::RequestContextCache {
 
   // This object has the same lifetime as the disk IoMgr.
   ~RequestContextCache() {
-    for (list<RequestContext*>::iterator it = all_contexts_.begin();
+    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
         it != all_contexts_.end(); ++it) {
       delete *it;
     }
@@ -147,9 +148,9 @@ class DiskIoMgr::RequestContextCache {
   // Validates that all readers are cleaned up and in the inactive state.  No locks
   // are taken since this is only called from the disk IoMgr destructor.
   bool ValidateAllInactive() {
-    for (list<RequestContext*>::iterator it = all_contexts_.begin();
+    for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
         it != all_contexts_.end(); ++it) {
-      if ((*it)->state_ != RequestContext::Inactive) {
+      if ((*it)->state_ != DiskIoRequestContext::Inactive) {
         return false;
       }
     }
@@ -166,16 +167,16 @@ class DiskIoMgr::RequestContextCache {
   mutex lock_;
 
   // List of all request contexts created.  Used for debugging
-  list<RequestContext*> all_contexts_;
+  list<DiskIoRequestContext*> all_contexts_;
 
   // List of inactive readers.  These objects can be used for a new reader.
-  list<RequestContext*> inactive_contexts_;
+  list<DiskIoRequestContext*> inactive_contexts_;
 };
 
 string DiskIoMgr::RequestContextCache::DebugString() {
   lock_guard<mutex> l(lock_);
   stringstream ss;
-  for (list<RequestContext*>::iterator it = all_contexts_.begin();
+  for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
       it != all_contexts_.end(); ++it) {
     unique_lock<mutex> lock((*it)->lock_);
     ss << (*it)->DebugString() << endl;
@@ -193,7 +194,7 @@ string DiskIoMgr::DebugString() {
     ss << "  " << (void*) disk_queues_[i] << ":" ;
     if (!disk_queues_[i]->request_contexts.empty()) {
       ss << " Readers: ";
-      for (RequestContext* req_context: disk_queues_[i]->request_contexts) {
+      for (DiskIoRequestContext* req_context: disk_queues_[i]->request_contexts) {
         ss << (void*)req_context;
       }
     }
@@ -206,7 +207,7 @@ DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) :
   io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
 }
 
-void DiskIoMgr::BufferDescriptor::Reset(RequestContext* reader,
+void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
       ScanRange* range, char* buffer, int64_t buffer_len) {
   DCHECK(io_mgr_ != NULL);
   DCHECK(buffer_ == NULL);
@@ -314,7 +315,7 @@ DiskIoMgr::~DiskIoMgr() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     if (disk_queues_[i] == NULL) continue;
     int disk_id = disk_queues_[i]->disk_id;
-    for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
+    for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
         it != disk_queues_[i]->request_contexts.end(); ++it) {
       DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
       DCHECK((*it)->disk_states_[disk_id].done());
@@ -384,7 +385,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   return Status::OK();
 }
 
-Status DiskIoMgr::RegisterContext(RequestContext** request_context,
+Status DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
     MemTracker* mem_tracker) {
   DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
   *request_context = request_context_cache_->GetNewContext();
@@ -392,7 +393,7 @@ Status DiskIoMgr::RegisterContext(RequestContext** request_context,
   return Status::OK();
 }
 
-void DiskIoMgr::UnregisterContext(RequestContext* reader) {
+void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
   // Blocking cancel (waiting for disks completion).
   CancelContext(reader, true);
 
@@ -425,7 +426,7 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
 // outstanding reference to the context decrements the number of disk queues the context
 // is on.
 // If wait_for_disks_completion is true, wait for the number of active disks to become 0.
-void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_completion) {
+void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) {
   context->Cancel(Status::CANCELLED);
 
   if (wait_for_disks_completion) {
@@ -437,50 +438,50 @@ void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_compl
   }
 }
 
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
   r->read_timer_ = c;
 }
 
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_bytes_read_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
   r->bytes_read_counter_ = c;
 }
 
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
+void DiskIoMgr::set_active_read_thread_counter(DiskIoRequestContext* r,
     RuntimeProfile::Counter* c) {
   r->active_read_thread_counter_ = c;
 }
 
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
+void DiskIoMgr::set_disks_access_bitmap(DiskIoRequestContext* r,
     RuntimeProfile::Counter* c) {
   r->disks_accessed_bitmap_ = c;
 }
 
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
+int64_t DiskIoMgr::queue_size(DiskIoRequestContext* reader) const {
   return reader->num_ready_buffers_.Load();
 }
 
-Status DiskIoMgr::context_status(RequestContext* context) const {
+Status DiskIoMgr::context_status(DiskIoRequestContext* context) const {
   unique_lock<mutex> lock(context->lock_);
   return context->status_;
 }
 
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_local(DiskIoRequestContext* reader) const {
   return reader->bytes_read_local_.Load();
 }
 
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_short_circuit(DiskIoRequestContext* reader) const {
   return reader->bytes_read_short_circuit_.Load();
 }
 
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_dn_cache(DiskIoRequestContext* reader) const {
   return reader->bytes_read_dn_cache_.Load();
 }
 
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
+int DiskIoMgr::num_remote_ranges(DiskIoRequestContext* reader) const {
   return reader->num_remote_ranges_.Load();
 }
 
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
+int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const {
   return reader->unexpected_remote_bytes_.Load();
 }
 
@@ -499,7 +500,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   return Status::OK();
 }
 
-Status DiskIoMgr::AddScanRanges(RequestContext* reader,
+Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
     const vector<ScanRange*>& ranges, bool schedule_immediately) {
   if (ranges.empty()) return Status::OK();
 
@@ -513,7 +514,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
-  if (reader->state_ == RequestContext::Cancelled) {
+  if (reader->state_ == DiskIoRequestContext::Cancelled) {
     DCHECK(!reader->status_.ok());
     return reader->status_;
   }
@@ -545,7 +546,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
 // This function returns the next scan range the reader should work on, checking
 // for eos and error cases. If there isn't already a cached scan range or a scan
 // range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) {
   DCHECK(reader != NULL);
   DCHECK(range != NULL);
   *range = NULL;
@@ -555,7 +556,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
   while (true) {
-    if (reader->state_ == RequestContext::Cancelled) {
+    if (reader->state_ == DiskIoRequestContext::Cancelled) {
       DCHECK(!reader->status_.ok());
       status = reader->status_;
       break;
@@ -599,7 +600,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
   return status;
 }
 
-Status DiskIoMgr::Read(RequestContext* reader,
+Status DiskIoMgr::Read(DiskIoRequestContext* reader,
     ScanRange* range, BufferDescriptor** buffer) {
   DCHECK(range != NULL);
   DCHECK(buffer != NULL);
@@ -623,7 +624,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
   DCHECK(buffer_desc != NULL);
   if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
 
-  RequestContext* reader = buffer_desc->reader_;
+  DiskIoRequestContext* reader = buffer_desc->reader_;
   if (buffer_desc->buffer_ != NULL) {
     if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
       // Not a cached buffer. Return the io buffer and update mem tracking.
@@ -655,7 +656,7 @@ void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
 }
 
 DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
-    RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
+    DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
   BufferDescriptor* buffer_desc;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
@@ -771,18 +772,18 @@ void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
 // b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
 //    immediately if there are no preceding scan ranges in in_flight_ranges_
 // It blocks until work is available or the thread is shut down.
-// Work is available if there is a RequestContext with
+// Work is available if there is a DiskIoRequestContext with
 //  - A ScanRange with a buffer available, or
 //  - A WriteRange in unstarted_write_ranges_.
 bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-    RequestContext** request_context) {
+    DiskIoRequestContext** request_context) {
   int disk_id = disk_queue->disk_id;
   *range = NULL;
 
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
     *request_context = NULL;
-    RequestContext::PerDiskState* request_disk_state = NULL;
+    DiskIoRequestContext::PerDiskState* request_disk_state = NULL;
     {
       unique_lock<mutex> disk_lock(disk_queue->lock);
 
@@ -827,12 +828,12 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
         << (*request_context)->DebugString();
 
     // Check if reader has been cancelled
-    if ((*request_context)->state_ == RequestContext::Cancelled) {
+    if ((*request_context)->state_ == DiskIoRequestContext::Cancelled) {
       request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
       continue;
     }
 
-    DCHECK_EQ((*request_context)->state_, RequestContext::Active)
+    DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active)
         << (*request_context)->DebugString();
 
     if (request_disk_state->next_scan_range_to_start() == NULL &&
@@ -889,7 +890,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
   return false;
 }
 
-void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
     const Status& write_status) {
   // Execute the callback before decrementing the thread count. Otherwise CancelContext()
   // that waits for the disk ref count to be 0 will return, creating a race, e.g.
@@ -900,8 +901,8 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
-    RequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
-    if (writer->state_ == RequestContext::Cancelled) {
+    DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
+    if (writer->state_ == DiskIoRequestContext::Cancelled) {
       state.DecrementRequestThreadAndCheckDone(writer);
     } else {
       state.DecrementRequestThread();
@@ -910,16 +911,16 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
   }
 }
 
-void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     BufferDescriptor* buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
-  RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+  DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
   DCHECK_GT(state.num_threads_in_op(), 0);
   DCHECK(buffer->buffer_ != NULL);
 
-  if (reader->state_ == RequestContext::Cancelled) {
+  if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
     ReturnFreeBuffer(buffer);
@@ -930,7 +931,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader
     return;
   }
 
-  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK_EQ(reader->state_, DiskIoRequestContext::Active);
   DCHECK(buffer->buffer_ != NULL);
 
   // Update the reader's scan ranges.  There are a three cases here:
@@ -979,7 +980,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   //   3. Perform the read or write as specified.
   // Cancellation checking needs to happen in both steps 1 and 3.
   while (true) {
-    RequestContext* worker_context = NULL;;
+    DiskIoRequestContext* worker_context = NULL;;
     RequestRange* range = NULL;
 
     if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
@@ -1000,7 +1001,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
 
 // This function reads the specified scan range associated with the
 // specified reader context and disk queue.
-void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
   char* buffer = NULL;
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -1017,11 +1018,11 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
   }
 
   if (!enough_memory) {
-    RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+    DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
     unique_lock<mutex> reader_lock(reader->lock_);
 
     // Just grabbed the reader lock, check for cancellation.
-    if (reader->state_ == RequestContext::Cancelled) {
+    if (reader->state_ == DiskIoRequestContext::Cancelled) {
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       state.DecrementRequestThreadAndCheckDone(reader);
       range->Cancel(reader->status_);
@@ -1087,7 +1088,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
   HandleReadFinished(disk_queue, reader, buffer_desc);
 }
 
-void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
+void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
   FILE* file_handle = fopen(write_range->file(), "rb+");
   Status ret_status;
   if (file_handle == NULL) {
@@ -1137,11 +1138,11 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
   return idx;
 }
 
-Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) {
+Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) {
   DCHECK_LE(write_range->len(), max_buffer_size_);
   unique_lock<mutex> writer_lock(writer->lock_);
 
-  if (writer->state_ == RequestContext::Cancelled) {
+  if (writer->state_ == DiskIoRequestContext::Cancelled) {
     DCHECK(!writer->status_.ok());
     return writer->status_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b130d52..79902c5 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -23,7 +23,6 @@
 #include <boost/unordered_set.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
-#include <boost/thread/thread.hpp>
 
 #include "common/atomic.h"
 #include "common/hdfs.h"
@@ -42,7 +41,7 @@ namespace impala {
 class MemTracker;
 
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
-/// (such as S3). Each query maps to one or more RequestContext objects, each of which
+/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, each of which
 /// has its own queue of scan ranges and/or write ranges.
 //
 /// The API splits up requesting scan/write ranges (non-blocking) and reading the data
@@ -185,12 +184,14 @@ class MemTracker;
 ///  - Internal classes are defined in disk-io-mgr-internal.h
 ///  - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc
 ///    This contains the ready buffer queue logic
-///  - RequestContext APIs are implemented in disk-io-mgr-reader-context.cc
+///  - DiskIoRequestContext APIs are implemented in disk-io-mgr-reader-context.cc
 ///    This contains the logic for picking scan ranges for a reader.
 ///  - Disk Thread and general APIs are implemented in disk-io-mgr.cc.
+
+class DiskIoRequestContext;
+
 class DiskIoMgr {
  public:
-  class RequestContext;
   class ScanRange;
 
   /// This class is a small wrapper around the hdfsFile handle and the file system
@@ -247,16 +248,17 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
     BufferDescriptor(DiskIoMgr* io_mgr);
 
     /// Resets the buffer descriptor state for a new reader, range and data buffer.
-    void Reset(RequestContext* reader, ScanRange* range, char* buffer,
+    void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
         int64_t buffer_len);
 
     DiskIoMgr* io_mgr_;
 
     /// Reader that this buffer is for
-    RequestContext* reader_;
+    DiskIoRequestContext* reader_;
 
     /// The current tracker this buffer is associated with.
     MemTracker* mem_tracker_;
@@ -367,9 +369,10 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
 
     /// Initialize internal fields
-    void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
+    void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader);
 
     /// Enqueues a buffer for this range. This does not block.
     /// Returns true if this scan range has hit the queue capacity, false otherwise.
@@ -423,7 +426,7 @@ class DiskIoMgr {
     DiskIoMgr* io_mgr_;
 
     /// Reader/owner of the scan range
-    RequestContext* reader_;
+    DiskIoRequestContext* reader_;
 
     /// File handle either to hdfs or local fs (FILE*)
     ///
@@ -446,7 +449,7 @@ class DiskIoMgr {
     int bytes_read_;
 
     /// Status for this range. This is non-ok if is_cancelled_ is true.
-    /// Note: an individual range can fail without the RequestContext being
+    /// Note: an individual range can fail without the DiskIoRequestContext being
     /// cancelled. This allows us to skip individual ranges.
     Status status_;
 
@@ -509,6 +512,7 @@ class DiskIoMgr {
 
    private:
     friend class DiskIoMgr;
+    friend class DiskIoRequestContext;
 
     /// Data to be written. RequestRange::len_ contains the length of data
     /// to be written.
@@ -540,13 +544,13 @@ class DiskIoMgr {
 
   /// Allocates tracking structure for a request context.
   /// Register a new request context which is returned in *request_context.
-  /// The IoMgr owns the allocated RequestContext object. The caller must call
+  /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call
   /// UnregisterContext() for each context.
   /// reader_mem_tracker: Is non-null only for readers. IO buffers
   ///    used for this reader will be tracked by this. If the limit is exceeded
   ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
   ///    GetNext().
-  Status RegisterContext(RequestContext** request_context,
+  Status RegisterContext(DiskIoRequestContext** request_context,
       MemTracker* reader_mem_tracker = NULL);
 
   /// Unregisters context from the disk IoMgr. This must be called for every
@@ -555,7 +559,7 @@ class DiskIoMgr {
   /// The 'context' cannot be used after this call.
   /// This call blocks until all the disk threads have finished cleaning up.
   /// UnregisterContext also cancels the reader/writer from the disk IoMgr.
-  void UnregisterContext(RequestContext* context);
+  void UnregisterContext(DiskIoRequestContext* context);
 
   /// This function cancels the context asychronously. All outstanding requests
   /// are aborted and tracking structures cleaned up. This does not need to be
@@ -565,7 +569,7 @@ class DiskIoMgr {
   /// context to reach 0. After calling with wait_for_disks_completion = true, the only
   /// valid API is returning IO buffers that have already been returned.
   /// Takes context->lock_ if wait_for_disks_completion is true.
-  void CancelContext(RequestContext* context, bool wait_for_disks_completion = false);
+  void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false);
 
   /// Adds the scan ranges to the queues. This call is non-blocking. The caller must
   /// not deallocate the scan range pointers before UnregisterContext().
@@ -573,26 +577,26 @@ class DiskIoMgr {
   /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
   /// This can be used to do synchronous reads as well as schedule dependent ranges,
   /// as in the case for columnar formats.
-  Status AddScanRanges(RequestContext* reader, const std::vector<ScanRange*>& ranges,
+  Status AddScanRanges(DiskIoRequestContext* reader, const std::vector<ScanRange*>& ranges,
       bool schedule_immediately = false);
 
   /// Add a WriteRange for the writer. This is non-blocking and schedules the context
   /// on the IoMgr disk queue. Does not create any files.
-  Status AddWriteRange(RequestContext* writer, WriteRange* write_range);
+  Status AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range);
 
   /// Returns the next unstarted scan range for this reader. When the range is returned,
   /// the disk threads in the IoMgr will already have started reading from it. The
   /// caller is expected to call ScanRange::GetNext on the returned range.
   /// If there are no more unstarted ranges, NULL is returned.
   /// This call is blocking.
-  Status GetNextRange(RequestContext* reader, ScanRange** range);
+  Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range);
 
   /// Reads the range and returns the result in buffer.
   /// This behaves like the typical synchronous read() api, blocking until the data
   /// is read. This can be called while there are outstanding ScanRanges and is
   /// thread safe. Multiple threads can be calling Read() per reader at a time.
   /// range *cannot* have already been added via AddScanRanges.
-  Status Read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
+  Status Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
 
   /// Determine which disk queue this file should be assigned to.  Returns an index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that holds the
@@ -600,21 +604,21 @@ class DiskIoMgr {
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  /// TODO: The functions below can be moved to RequestContext.
+  /// TODO: The functions below can be moved to DiskIoRequestContext.
   /// Returns the current status of the context.
-  Status context_status(RequestContext* context) const;
+  Status context_status(DiskIoRequestContext* context) const;
 
-  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
-  void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
+  void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_active_read_thread_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+  void set_disks_access_bitmap(DiskIoRequestContext*, RuntimeProfile::Counter*);
 
-  int64_t queue_size(RequestContext* reader) const;
-  int64_t bytes_read_local(RequestContext* reader) const;
-  int64_t bytes_read_short_circuit(RequestContext* reader) const;
-  int64_t bytes_read_dn_cache(RequestContext* reader) const;
-  int num_remote_ranges(RequestContext* reader) const;
-  int64_t unexpected_remote_bytes(RequestContext* reader) const;
+  int64_t queue_size(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_local(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const;
+  int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const;
+  int num_remote_ranges(DiskIoRequestContext* reader) const;
+  int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const;
 
   /// Returns the read throughput across all readers.
   /// TODO: should this be a sliding window?  This should report metrics for the
@@ -671,6 +675,7 @@ class DiskIoMgr {
 
  private:
   friend class BufferDescriptor;
+  friend class DiskIoRequestContext;
   struct DiskQueue;
   class RequestContextCache;
 
@@ -757,7 +762,7 @@ class DiskIoMgr {
   /// should be <= max_buffer_size_. These constraints will be met if buffer was acquired
   /// via GetFreeBuffer() (which it should have been).
   BufferDescriptor* GetBufferDesc(
-      RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
+      DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
 
   /// Returns a buffer desc object which can now be used for another reader.
   void ReturnBufferDesc(BufferDescriptor* desc);
@@ -798,11 +803,11 @@ class DiskIoMgr {
   /// Only returns false if the disk thread should be shut down.
   /// No locks should be taken before this function call and none are left taken after.
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-      RequestContext** request_context);
+      DiskIoRequestContext** request_context);
 
   /// Updates disk queue and reader state after a read is complete. The read result
   /// is captured in the buffer descriptor.
-  void HandleReadFinished(DiskQueue*, RequestContext*, BufferDescriptor*);
+  void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, BufferDescriptor*);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
@@ -810,7 +815,7 @@ class DiskIoMgr {
   /// The write_status does not affect the writer->status_. That is, an write error does
   /// not cancel the writer context - that decision is left to the callback handler.
   /// TODO: On the read path, consider not canceling the reader context on error.
-  void HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+  void HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
       const Status& write_status);
 
   /// Validates that range is correctly initialized
@@ -818,7 +823,7 @@ class DiskIoMgr {
 
   /// Write the specified range to disk and calls HandleWriteFinished when done.
   /// Responsible for opening and closing the file that is written.
-  void Write(RequestContext* writer_context, WriteRange* write_range);
+  void Write(DiskIoRequestContext* writer_context, WriteRange* write_range);
 
   /// Helper method to write a range using the specified FILE handle. Returns Status:OK
   /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise.
@@ -826,7 +831,7 @@ class DiskIoMgr {
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range);
 
   /// Reads the specified scan range and calls HandleReadFinished when done.
-  void ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+  void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
       ScanRange* range);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 07378e4..7eaa3db 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -22,6 +22,7 @@
 
 #include "common/logging.h"
 #include "resourcebroker/resource-broker.h"
+#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
@@ -38,6 +39,7 @@
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
+#include "util/hdfs-bulk-ops.h"
 #include "util/mem-info.h"
 #include "util/metrics.h"
 #include "util/network-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index e405bf8..c5404dc 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -17,35 +17,35 @@
 #define IMPALA_RUNTIME_EXEC_ENV_H
 
 #include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/thread.hpp>
 
+// NOTE: try not to add more headers here: exec-env.h is included in many many files.
 #include "common/status.h"
-#include "runtime/backend-client.h"
-#include "util/cgroups-mgr.h"
-#include "util/hdfs-bulk-ops.h" // For declaration of HdfsOpThreadPool
-#include "resourcebroker/resource-broker.h"
+#include "runtime/client-cache-types.h"
+#include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
 
 namespace impala {
 
+class CallableThreadPool;
+class CgroupsMgr;
 class DataStreamMgr;
 class DiskIoMgr;
+class FragmentMgr;
+class Frontend;
 class HBaseTableFactory;
 class HdfsFsCache;
+class ImpalaServer;
 class LibCache;
+class MemTracker;
+class MetricGroup;
+class QueryResourceMgr;
+class RequestPoolService;
+class ResourceBroker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
-class Webserver;
-class MetricGroup;
-class MemTracker;
 class ThreadResourceMgr;
-class CgroupsManager;
-class ImpalaServer;
-class RequestPoolService;
-class FragmentMgr;
-class Frontend;
 class TmpFileMgr;
+class Webserver;
 
 /// Execution environment for queries/plan fragments.
 /// Contains all required global structures, and handles to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index 5c1e423..b3a6f0a 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -29,14 +29,15 @@
 #include "common/names.h"
 
 namespace filesystem = boost::filesystem;
-using namespace impala;
 
 DEFINE_string(local_library_dir, "/tmp",
               "Local directory to copy UDF libraries from HDFS into");
 
+namespace impala {
+
 scoped_ptr<LibCache> LibCache::instance_;
 
-struct LibCache::LibCacheEntry {
+struct LibCacheEntry {
   // Lock protecting all fields in this entry
   boost::mutex lock;
 
@@ -53,7 +54,7 @@ struct LibCache::LibCacheEntry {
   bool check_needs_refresh;
 
   // The type of this file.
-  LibType type;
+  LibCache::LibType type;
 
   // The path on the local file system for this library.
   std::string local_path;
@@ -117,7 +118,7 @@ Status LibCache::InitInternal() {
   return Status::OK();
 }
 
-LibCache::LibCacheEntry::~LibCacheEntry() {
+LibCacheEntry::~LibCacheEntry() {
   if (shared_object_handle != NULL) {
     DCHECK_EQ(use_count, 0);
     DCHECK(should_remove);
@@ -418,3 +419,5 @@ string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir)
       << (num_libs_copied_.Add(1) - 1) << src.extension().native();
   return dst.str();
 }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index b36a859..9201341 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -50,10 +50,10 @@ class RuntimeState;
 /// TODO:
 /// - refresh libraries
 /// - better cached module management.
+struct LibCacheEntry;
+
 class LibCache {
  public:
-  struct LibCacheEntry;
-
   enum LibType {
     TYPE_SO,      // Shared object
     TYPE_IR,      // IR intermediate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 9e22e9f..fdf38e9 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -23,7 +23,6 @@
 
 #include "common/logging.h"
 #include "util/bit-util.h"
-#include "util/runtime-profile.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 9534671..73c9300 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,7 +29,7 @@
 #include "util/debug-util.h"
 #include "util/internal-queue.h"
 #include "util/metrics.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
 #include "gen-cpp/Types_types.h" // for TUniqueId

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 99ed75d..75c5f58 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -29,10 +29,13 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/hbase-table-scanner.h"
 #include "exprs/expr.h"
+#include "resourcebroker/resource-broker.h"
 #include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
 #include "runtime/mem-tracker.h"
+#include "scheduling/query-resource-mgr.h"
 #include "util/cgroups-mgr.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -589,7 +592,7 @@ void PlanFragmentExecutor::Close() {
           runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
     }
     if (plan_ != NULL) plan_->Close(runtime_state_.get());
-    for (DiskIoMgr::RequestContext* context: *runtime_state_->reader_contexts()) {
+    for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
       runtime_state_->io_mgr()->UnregisterContext(context);
     }
     exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 91cece6..29250a3 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "common/object-pool.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
 #include "util/thread.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index cb49bda..1f28440 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -15,6 +15,8 @@
 #include "testutil/gtest-util.h"
 #include "runtime/collection-value.h"
 #include "runtime/collection-value-builder.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3f2ba29..4197977 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -27,7 +27,6 @@
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
new file mode 100644
index 0000000..91f12c9
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -0,0 +1,222 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "runtime/runtime-filter-bank.h"
+
+#include "common/names.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/bits.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter.inline.h"
+#include "service/impala-server.h"
+#include "util/bloom-filter.h"
+
+using namespace impala;
+using namespace boost;
+using namespace strings;
+
+DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
+    "positives in a runtime filter before it is disabled.");
+
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+
+RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
+    : state_(state), closed_(false) {
+  memory_allocated_ =
+      state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
+
+  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
+  max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size;
+  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  max_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size;
+  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  min_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  // Make sure that min <= max
+  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+  DCHECK_GT(min_filter_size_, 0);
+  DCHECK_GT(max_filter_size_, 0);
+
+  default_filter_size_ = query_ctx.request.query_options.runtime_bloom_filter_size;
+  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+  default_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
+}
+
+RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
+    bool is_producer) {
+  RuntimeFilter* ret = obj_pool_.Add(
+      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (is_producer) {
+    DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
+    produced_filters_[filter_desc.filter_id] = ret;
+  } else {
+    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+      consumed_filters_[filter_desc.filter_id] = ret;
+    } else {
+      // The filter has already been registered in this filter bank by another
+      // target node.
+      DCHECK_GT(filter_desc.targets.size(), 1);
+      ret = consumed_filters_[filter_desc.filter_id];
+    }
+  }
+  return ret;
+}
+
+namespace {
+
+/// Sends a filter to the coordinator. Executed asynchronously in the context of
+/// ExecEnv::rpc_pool().
+void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
+    ImpalaBackendClientCache* client_cache) {
+  Status status;
+  ImpalaBackendConnection coord(client_cache, address, &status);
+  if (!status.ok()) {
+    // Failing to send a filter is not a query-wide error - the remote fragment will
+    // continue regardless.
+    // TODO: Retry.
+    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
+    return;
+  }
+  TUpdateFilterResult res;
+  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
+}
+
+}
+
+void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
+    BloomFilter* bloom_filter) {
+  DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
+      << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
+  TUpdateFilterParams params;
+  // A runtime filter may have both local and remote targets.
+  bool has_local_target = false;
+  bool has_remote_target = false;
+  {
+    lock_guard<mutex> l(runtime_filter_lock_);
+    RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+    DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
+                                          << filter_id;
+    it->second->SetBloomFilter(bloom_filter);
+    has_local_target = it->second->filter_desc().has_local_targets;
+    has_remote_target = it->second->filter_desc().has_remote_targets;
+  }
+
+  if (has_local_target) {
+    // Do a short circuit publication by pushing the same BloomFilter to the consumer
+    // side.
+    RuntimeFilter* filter;
+    {
+      lock_guard<mutex> l(runtime_filter_lock_);
+      RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+      if (it == consumed_filters_.end()) return;
+      filter = it->second;
+    }
+    filter->SetBloomFilter(bloom_filter);
+    state_->runtime_profile()->AddInfoString(
+        Substitute("Filter $0 arrival", filter_id),
+        PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
+  }
+
+  if (has_remote_target
+      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
+    params.filter_id = filter_id;
+    params.query_id = state_->query_id();
+
+    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
+        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
+        ExecEnv::GetInstance()->impalad_client_cache()));
+  }
+}
+
+void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
+    const TBloomFilter& thrift_filter) {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (closed_) return;
+  RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+  DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
+                                        << filter_id;
+  if (thrift_filter.always_true) {
+    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+  } else {
+    int64_t required_space =
+        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
+    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+    // there's not enough memory for it.
+    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
+      VLOG_QUERY << "No memory for global filter: " << filter_id
+                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
+      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+    } else {
+      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
+      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+      it->second->SetBloomFilter(bloom_filter);
+    }
+  }
+  state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
+      PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
+}
+
+BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  if (closed_) return NULL;
+
+  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
+
+  // Track required space
+  int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
+  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
+  if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
+  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
+  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+  return bloom_filter;
+}
+
+int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
+  if (ndv == -1) return default_filter_size_;
+  int64_t required_space =
+      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
+  required_space = max<int64_t>(required_space, min_filter_size_);
+  required_space = min<int64_t>(required_space, max_filter_size_);
+  return required_space;
+}
+
+bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
+  double fpp =
+      BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
+  return fpp > FLAGS_max_filter_error_rate;
+}
+
+void RuntimeFilterBank::Close() {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  closed_ = true;
+  obj_pool_.Clear();
+  state_->query_mem_tracker()->Release(memory_allocated_->value());
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
new file mode 100644
index 0000000..4703a0f
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -0,0 +1,149 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+#define IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+
+#include "common/object-pool.h"
+#include "runtime/types.h"
+#include "util/runtime-profile.h"
+
+#include <boost/thread/lock_guard.hpp>
+#include <boost/unordered_map.hpp>
+
+namespace impala {
+
+class BloomFilter;
+class RuntimeFilter;
+class RuntimeState;
+class TBloomFilter;
+class TRuntimeFilterDesc;
+class TQueryCtx;
+
+/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
+/// predicates across the plan tree dynamically. Each fragment instance manages its
+/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
+/// objects and data structures.
+///
+/// A RuntimeFilterBank manages both production and consumption of filters. In the case
+/// where a given filter is both consumed and produced by the same fragment, the
+/// RuntimeFilterBank treats each filter independently.
+///
+/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
+/// fragments update the bloom filters by calling UpdateFilterFromLocal()
+/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
+/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
+/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
+/// associated with filters.
+///
+/// Filters are aggregated at the coordinator, and then made available to consumers after
+/// PublishGlobalFilter() has been called.
+///
+/// After PublishGlobalFilter() has been called (and again, it may only be called once per
+/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
+/// bloom_filter, and may be used for filter evaluation. This operation occurs without
+/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
+/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
+class RuntimeFilterBank {
+ public:
+  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
+
+  /// Registers a filter that will either be produced (is_producer == false) or consumed
+  /// (is_producer == true) by fragments that share this RuntimeState. The filter
+  /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
+  RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
+
+  /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
+  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
+  /// full filter that contains all elements.
+  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
+
+  /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
+  /// consumption by operators that wish to use it for filtering.
+  void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
+
+  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
+  /// 'filter_size' would have an expected false-positive rate which would exceed
+  /// FLAGS_max_filter_error_rate.
+  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
+
+  /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
+  /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
+  /// clients and subsequently accessed without synchronization. Concurrent calls to
+  /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
+  /// need for client synchronization.
+  inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
+
+  /// Returns a bloom_filter that can be used by an operator to produce a local filter,
+  /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
+  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
+  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
+  /// been previously registered as a 'producer' by RegisterFilter().
+  ///
+  /// If there is not enough memory, or if Close() has been called first, returns NULL.
+  BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
+
+  /// Default hash seed to use when computing hashed values to insert into filters.
+  static const int32_t DefaultHashSeed() { return 1234; }
+
+  /// Releases all memory allocated for BloomFilters.
+  void Close();
+
+  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
+  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
+
+ private:
+  /// Returns the the space (in bytes) required for a filter to achieve the configured
+  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
+  /// estimate is known), the default filter size is returned.
+  int64_t GetFilterSizeForNdv(int64_t ndv);
+
+  /// Lock protecting produced_filters_ and consumed_filters_.
+  boost::mutex runtime_filter_lock_;
+
+  /// Map from filter id to a RuntimeFilter.
+  typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
+
+  /// All filters expected to be produced by the local plan fragment instance.
+  RuntimeFilterMap produced_filters_;
+
+  /// All filters expected to be consumed by the local plan fragment instance.
+  RuntimeFilterMap consumed_filters_;
+
+  /// Fragment instance's runtime state.
+  RuntimeState* state_;
+
+  /// Object pool to track allocated Bloom filters.
+  ObjectPool obj_pool_;
+
+  /// True iff Close() has been called. Used to prevent races between
+  /// AllocateScratchBloomFilter() and Close().
+  bool closed_;
+
+  /// Total amount of memory allocated to Bloom Filters
+  RuntimeProfile::Counter* memory_allocated_;
+
+  /// Precomputed default BloomFilter size.
+  int64_t default_filter_size_;
+
+  /// Maximum filter size, in bytes, rounded up to a power of two.
+  int64_t max_filter_size_;
+
+  /// Minimum filter size, in bytes, rounded up to a power of two.
+  int64_t min_filter_size_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index b0126ec..9ae6cbb 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -14,211 +14,14 @@
 
 #include "runtime/runtime-filter.inline.h"
 
+#include "util/time.h"
+
 #include "common/names.h"
-#include "gutil/bits.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
-#include "runtime/backend-client.h"
-#include "service/impala-server.h"
-#include "util/bloom-filter.h"
 
 using namespace impala;
-using namespace boost;
-using namespace strings;
-
-DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
-    "positives in a runtime filter before it is disabled.");
 
 const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
 
-const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
-
-RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
-    : query_ctx_(query_ctx), state_(state), closed_(false) {
-  memory_allocated_ =
-      state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
-
-  // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
-  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  max_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
-  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
-  min_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
-  // Make sure that min <= max
-  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
-
-  DCHECK_GT(min_filter_size_, 0);
-  DCHECK_GT(max_filter_size_, 0);
-
-  default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
-  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
-  default_filter_size_ =
-      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
-}
-
-RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
-    bool is_producer) {
-  RuntimeFilter* ret = obj_pool_.Add(
-      new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (is_producer) {
-    DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
-    produced_filters_[filter_desc.filter_id] = ret;
-  } else {
-    if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
-      consumed_filters_[filter_desc.filter_id] = ret;
-    } else {
-      // The filter has already been registered in this filter bank by another
-      // target node.
-      DCHECK_GT(filter_desc.targets.size(), 1);
-      ret = consumed_filters_[filter_desc.filter_id];
-    }
-  }
-  return ret;
-}
-
-namespace {
-
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
-    ImpalaBackendClientCache* client_cache) {
-  Status status;
-  ImpalaBackendConnection coord(client_cache, address, &status);
-  if (!status.ok()) {
-    // Failing to send a filter is not a query-wide error - the remote fragment will
-    // continue regardless.
-    // TODO: Retry.
-    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
-    return;
-  }
-  TUpdateFilterResult res;
-  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
-
-}
-
-void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
-    BloomFilter* bloom_filter) {
-  DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
-      << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
-  TUpdateFilterParams params;
-  // A runtime filter may have both local and remote targets.
-  bool has_local_target = false;
-  bool has_remote_target = false;
-  {
-    lock_guard<mutex> l(runtime_filter_lock_);
-    RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
-    DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
-                                          << filter_id;
-    it->second->SetBloomFilter(bloom_filter);
-    has_local_target = it->second->filter_desc().has_local_targets;
-    has_remote_target = it->second->filter_desc().has_remote_targets;
-  }
-
-  if (has_local_target) {
-    // Do a short circuit publication by pushing the same BloomFilter to the consumer
-    // side.
-    RuntimeFilter* filter;
-    {
-      lock_guard<mutex> l(runtime_filter_lock_);
-      RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
-      if (it == consumed_filters_.end()) return;
-      filter = it->second;
-    }
-    filter->SetBloomFilter(bloom_filter);
-    state_->runtime_profile()->AddInfoString(
-        Substitute("Filter $0 arrival", filter_id),
-        PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
-  }
-
-  if (has_remote_target
-      && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-    BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-    params.filter_id = filter_id;
-    params.query_id = query_ctx_.query_id;
-
-    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
-        SendFilterToCoordinator, query_ctx_.coord_address, params,
-        ExecEnv::GetInstance()->impalad_client_cache()));
-  }
-}
-
-void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
-    const TBloomFilter& thrift_filter) {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
-  DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << filter_id;
-  if (thrift_filter.always_true) {
-    it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
-  } else {
-    int64_t required_space =
-        BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
-    // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
-    // there's not enough memory for it.
-    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
-      VLOG_QUERY << "No memory for global filter: " << filter_id
-                 << " (fragment instance: " << state_->fragment_instance_id() << ")";
-      it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
-    } else {
-      BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
-      DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-      memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-      it->second->SetBloomFilter(bloom_filter);
-    }
-  }
-  state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
-      PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
-}
-
-BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  if (closed_) return NULL;
-
-  RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
-  DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
-
-  // Track required space
-  int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
-  int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
-  if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
-  BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
-  DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
-  memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
-  return bloom_filter;
-}
-
-int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return default_filter_size_;
-  int64_t required_space =
-      1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  required_space = max<int64_t>(required_space, min_filter_size_);
-  required_space = min<int64_t>(required_space, max_filter_size_);
-  return required_space;
-}
-
-bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
-  double fpp =
-      BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
-  return fpp > FLAGS_max_filter_error_rate;
-}
-
-void RuntimeFilterBank::Close() {
-  lock_guard<mutex> l(runtime_filter_lock_);
-  closed_ = true;
-  obj_pool_.Clear();
-  state_->query_mem_tracker()->Release(memory_allocated_->value());
-}
-
 bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
   do {
     if (HasBloomFilter()) return true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 178c03f..4168704 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -16,135 +16,14 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H
 #define IMPALA_RUNTIME_RUNTIME_FILTER_H
 
-#include <boost/unordered_map.hpp>
-
-#include "common/object-pool.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "runtime/types.h"
-#include "util/runtime-profile.h"
+#include "runtime/raw-value.h"
+#include "runtime/runtime-filter-bank.h"
+#include "util/bloom-filter.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
 class BloomFilter;
-class RuntimeFilter;
-class RuntimeState;
-
-/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
-/// predicates across the plan tree dynamically. Each fragment instance manages its
-/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
-/// objects and data structures.
-///
-/// A RuntimeFilterBank manages both production and consumption of filters. In the case
-/// where a given filter is both consumed and produced by the same fragment, the
-/// RuntimeFilterBank treats each filter independently.
-///
-/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
-/// fragments update the bloom filters by calling UpdateFilterFromLocal()
-/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
-/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
-/// AllocateScratchBloomFilter() (or be NULL); this allows RuntimeFilterBank to manage all
-/// memory associated with filters.
-///
-/// Filters are aggregated at the coordinator, and then made available to consumers after
-/// PublishGlobalFilter() has been called.
-///
-/// After PublishGlobalFilter() has been called (and again, it may only be called once per
-/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
-/// bloom_filter, and may be used for filter evaluation. This operation occurs without
-/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
-/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
-class RuntimeFilterBank {
- public:
-  RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
-
-  /// Registers a filter that will either be produced (is_producer == false) or consumed
-  /// (is_producer == true) by fragments that share this RuntimeState. The filter
-  /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
-  RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
-
-  /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
-  /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
-  /// full filter that contains all elements.
-  void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
-
-  /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
-  /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
-
-  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
-  /// 'filter_size' would have an expected false-positive rate which would exceed
-  /// FLAGS_max_filter_error_rate.
-  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
-
-  /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
-  /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
-  /// clients and subsequently accessed without synchronization. Concurrent calls to
-  /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
-  /// need for client synchronization.
-  inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
-
-  /// Returns a bloom_filter that can be used by an operator to produce a local filter,
-  /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
-  /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
-  /// should not be deleted by the caller. The filter identified by 'filter_id' must have
-  /// been previously registered as a 'producer' by RegisterFilter().
-  ///
-  /// If there is not enough memory, or if Close() has been called first, returns NULL.
-  BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
-
-  /// Default hash seed to use when computing hashed values to insert into filters.
-  static const int32_t DefaultHashSeed() { return 1234; }
-
-  /// Releases all memory allocated for BloomFilters.
-  void Close();
-
-  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
-  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
-
- private:
-  /// Returns the the space (in bytes) required for a filter to achieve the configured
-  /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
-  /// estimate is known), the default filter size is returned.
-  int64_t GetFilterSizeForNdv(int64_t ndv);
-
-  const TQueryCtx query_ctx_;
-
-  /// Lock protecting produced_filters_ and consumed_filters_.
-  boost::mutex runtime_filter_lock_;
-
-  /// Map from filter id to a RuntimeFilter.
-  typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
-
-  /// All filters expected to be produced by the local plan fragment instance.
-  RuntimeFilterMap produced_filters_;
-
-  /// All filters expected to be consumed by the local plan fragment instance.
-  RuntimeFilterMap consumed_filters_;
-
-  /// Fragment instance's runtime state.
-  RuntimeState* state_;
-
-  /// Object pool to track allocated Bloom filters.
-  ObjectPool obj_pool_;
-
-  /// True iff Close() has been called. Used to prevent races between
-  /// AllocateScratchBloomFilter() and Close().
-  bool closed_;
-
-  /// Total amount of memory allocated to Bloom Filters
-  RuntimeProfile::Counter* memory_allocated_;
-
-  /// Precomputed default BloomFilter size.
-  int64_t default_filter_size_;
-
-  /// Maximum filter size, in bytes, rounded up to a power of two.
-  int64_t max_filter_size_;
-
-  /// Minimum filter size, in bytes, rounded up to a power of two.
-  int64_t min_filter_size_;
-};
 
 /// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
 /// that are computed during query execution (rather than during planning). They can then



[4/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv dependencies

Posted by ta...@apache.org.
Refactor RuntimeState and ExecEnv dependencies

Previously including runtime-state.h or exec-env.h pulled in a huge
number of headers. By replacing all of those includes with forward
declarations, we can reduce the number of headers included when building
each source file.

This required various changes, including splitting header files, and in
one case extracting the nested DiskIoMgr::RequestContext class so that
the RequestContext can be instantiated without the full DiskIoMgr
header.

The payoff is that touching many header files results in significantly
smaller incremental builds. E.g. changes to bloom-filter.h only require
recompiling a handful of files, instead of 100+.

Build time of individual files should also be slightly quicker, since
they pull in fewer headers.

Change-Id: I3b246ad9c3681d649e7bfc969c7fa885c6242d84
Reviewed-on: http://gerrit.cloudera.org:8080/3108
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6198d926
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6198d926
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6198d926

Branch: refs/heads/master
Commit: 6198d9262e6a9b5beb937e996b9afa942190bfdc
Parents: f09c631
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Feb 23 18:10:26 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed May 25 19:41:45 2016 -0700

----------------------------------------------------------------------
 .../benchmarks/row-batch-serialize-benchmark.cc |   1 +
 be/src/codegen/codegen-anyval.h                 |   2 +
 be/src/codegen/llvm-codegen-test.cc             |   2 +
 be/src/codegen/llvm-codegen.cc                  |   4 +
 be/src/common/status.h                          |   3 -
 be/src/exec/aggregation-node.cc                 |   2 +-
 be/src/exec/aggregation-node.h                  |   1 -
 be/src/exec/analytic-eval-node.cc               |   1 +
 be/src/exec/base-sequence-scanner.cc            |   1 +
 be/src/exec/blocking-join-node.cc               |   4 +-
 be/src/exec/catalog-op-executor.cc              |   1 +
 be/src/exec/data-sink.cc                        |   1 +
 be/src/exec/data-source-scan-node.cc            |   3 +-
 be/src/exec/exchange-node.cc                    |   2 +-
 be/src/exec/exec-node.cc                        |   2 +-
 be/src/exec/filter-context.cc                   |   2 +
 be/src/exec/hash-join-node.cc                   |   5 +-
 be/src/exec/hash-table-test.cc                  |   3 +-
 be/src/exec/hash-table.h                        |   1 -
 be/src/exec/hbase-scan-node.cc                  |   3 +-
 be/src/exec/hbase-table-scanner.cc              |   2 +
 be/src/exec/hbase-table-sink.cc                 |   2 +
 be/src/exec/hbase-table-writer.cc               |   6 +-
 be/src/exec/hbase-table-writer.h                |   3 +-
 be/src/exec/hdfs-avro-scanner-ir.cc             |   1 +
 be/src/exec/hdfs-avro-scanner.cc                |   2 +-
 be/src/exec/hdfs-avro-table-writer.cc           |   8 +
 be/src/exec/hdfs-avro-table-writer.h            |   2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |   2 +-
 be/src/exec/hdfs-parquet-scanner.h              |   1 +
 be/src/exec/hdfs-parquet-table-writer.cc        |   2 +
 be/src/exec/hdfs-rcfile-scanner.cc              |   1 +
 be/src/exec/hdfs-scan-node.cc                   |   5 +-
 be/src/exec/hdfs-scan-node.h                    |   6 +-
 be/src/exec/hdfs-scanner.cc                     |   2 +-
 be/src/exec/hdfs-sequence-scanner.cc            |   1 +
 be/src/exec/hdfs-sequence-table-writer.cc       |   3 +
 be/src/exec/hdfs-table-writer.cc                |   1 +
 be/src/exec/hdfs-text-scanner.cc                |   1 +
 be/src/exec/hdfs-text-scanner.h                 |   1 +
 be/src/exec/hdfs-text-table-writer.cc           |   2 +
 be/src/exec/kudu-scan-node-test.cc              |   2 +
 be/src/exec/kudu-scan-node.cc                   |   9 +-
 be/src/exec/kudu-scanner.cc                     |   3 +-
 be/src/exec/kudu-table-sink-test.cc             |   3 +-
 be/src/exec/kudu-table-sink.cc                  |   3 +
 be/src/exec/kudu-testutil.h                     |   1 +
 be/src/exec/nested-loop-join-node.cc            |   3 +-
 be/src/exec/old-hash-table-test.cc              |   4 +-
 be/src/exec/old-hash-table.cc                   |   4 +
 be/src/exec/old-hash-table.h                    |   1 +
 be/src/exec/partitioned-aggregation-node.cc     |   3 +-
 be/src/exec/partitioned-aggregation-node.h      |   1 -
 be/src/exec/partitioned-hash-join-node.cc       |   4 +-
 be/src/exec/row-batch-list-test.cc              |   2 +-
 be/src/exec/scan-node.cc                        |   2 +
 be/src/exec/scanner-context.cc                  |   1 +
 be/src/exec/scanner-context.h                   |   1 -
 be/src/exec/select-node.cc                      |   2 +
 be/src/exec/sort-node.cc                        |   1 +
 be/src/exec/subplan-node.cc                     |   1 +
 be/src/exec/text-converter.cc                   |   1 +
 be/src/exec/topn-node.cc                        |   2 +-
 be/src/exec/union-node.cc                       |   2 +
 be/src/exec/unnest-node.cc                      |   4 +-
 be/src/experiments/data-provider.cc             |   2 +
 be/src/experiments/tuple-splitter-test.cc       |   2 +-
 be/src/exprs/agg-fn-evaluator.h                 |   2 +-
 be/src/exprs/compound-predicates.cc             |  16 +
 be/src/exprs/compound-predicates.h              |  12 +-
 be/src/exprs/expr-benchmark.cc                  |   1 +
 be/src/exprs/expr-codegen-test.cc               |   1 +
 be/src/exprs/expr-context.cc                    |   1 +
 be/src/exprs/expr-test.cc                       |   1 +
 be/src/exprs/expr.cc                            |   9 +
 be/src/exprs/expr.h                             |  24 +-
 be/src/exprs/in-predicate-ir.cc                 |   1 +
 be/src/exprs/math-functions-ir.cc               |   4 -
 be/src/exprs/slot-ref.cc                        |   4 +
 be/src/exprs/slot-ref.h                         |   1 +
 be/src/exprs/tuple-is-null-predicate.cc         |   2 +
 be/src/runtime/CMakeLists.txt                   |   1 +
 be/src/runtime/backend-client.h                 |   5 +-
 be/src/runtime/buffered-block-mgr.cc            |   2 +-
 be/src/runtime/buffered-block-mgr.h             |   2 +-
 be/src/runtime/buffered-tuple-stream.cc         |   1 +
 be/src/runtime/client-cache-types.h             |  42 ++
 be/src/runtime/client-cache.h                   |   4 +-
 be/src/runtime/collection-value-builder-test.cc |   1 +
 be/src/runtime/coordinator.cc                   |   2 +
 be/src/runtime/coordinator.h                    |   2 +-
 be/src/runtime/data-stream-mgr.cc               |   1 +
 be/src/runtime/data-stream-mgr.h                |   2 +-
 be/src/runtime/data-stream-recvr.cc             |   3 +-
 be/src/runtime/data-stream-sender.cc            |   1 +
 be/src/runtime/data-stream-sender.h             |   1 +
 be/src/runtime/data-stream-test.cc              |   3 +-
 be/src/runtime/descriptors.h                    |   1 -
 be/src/runtime/disk-io-mgr-internal.h           |  53 +-
 be/src/runtime/disk-io-mgr-reader-context.cc    |  42 +-
 be/src/runtime/disk-io-mgr-scan-range.cc        |   4 +-
 be/src/runtime/disk-io-mgr-stress.cc            |   2 +-
 be/src/runtime/disk-io-mgr-test.cc              |  38 +-
 be/src/runtime/disk-io-mgr.cc                   | 123 ++---
 be/src/runtime/disk-io-mgr.h                    |  75 +--
 be/src/runtime/exec-env.cc                      |   2 +
 be/src/runtime/exec-env.h                       |  28 +-
 be/src/runtime/lib-cache.cc                     |  11 +-
 be/src/runtime/lib-cache.h                      |   4 +-
 be/src/runtime/mem-pool.h                       |   1 -
 be/src/runtime/mem-tracker.h                    |   2 +-
 be/src/runtime/plan-fragment-executor.cc        |   5 +-
 be/src/runtime/plan-fragment-executor.h         |   1 +
 be/src/runtime/row-batch-serialize-test.cc      |   2 +
 be/src/runtime/row-batch.h                      |   1 -
 be/src/runtime/runtime-filter-bank.cc           | 222 +++++++++
 be/src/runtime/runtime-filter-bank.h            | 149 ++++++
 be/src/runtime/runtime-filter.cc                | 201 +-------
 be/src/runtime/runtime-filter.h                 | 127 +----
 be/src/runtime/runtime-state.cc                 |  12 +-
 be/src/runtime/runtime-state.h                  |  38 +-
 be/src/runtime/sorted-run-merger.cc             |   2 +-
 be/src/runtime/sorter.cc                        |   2 +-
 be/src/runtime/string-buffer.h                  |   1 +
 be/src/runtime/thread-resource-mgr.h            |   3 -
 be/src/runtime/tuple.cc                         |   1 +
 be/src/scheduling/admission-controller.cc       |   2 +-
 be/src/scheduling/query-resource-mgr.cc         |   1 +
 be/src/scheduling/simple-scheduler.cc           |   1 +
 be/src/service/fragment-exec-state.cc           |   1 +
 be/src/service/fragment-mgr.cc                  |   1 +
 be/src/service/impala-beeswax-server.cc         |   1 +
 be/src/service/impala-hs2-server.cc             |   3 +-
 be/src/service/query-exec-state.cc              |   3 +
 be/src/statestore/statestore-subscriber.h       |   7 +-
 be/src/testutil/desc-tbl-builder.cc             |   2 +-
 be/src/testutil/desc-tbl-builder.h              |   1 +
 be/src/util/auth-util.cc                        |   2 +
 be/src/util/auth-util.h                         |  16 +-
 be/src/util/avro-util.cc                        |   5 +-
 be/src/util/codec.h                             |   2 -
 be/src/util/decompress.cc                       |   1 +
 be/src/util/dict-encoding.h                     |   1 -
 be/src/util/hdfs-bulk-ops-defs.h                |  31 ++
 be/src/util/hdfs-bulk-ops.h                     |   5 +-
 be/src/util/metrics.h                           |   2 +-
 be/src/util/periodic-counter-updater.cc         |   1 +
 be/src/util/runtime-profile-counters.h          | 488 ++++++++++++++++++
 be/src/util/runtime-profile-test.cc             |   2 +-
 be/src/util/runtime-profile.cc                  |   2 +-
 be/src/util/runtime-profile.h                   | 497 +------------------
 be/src/util/simple-logger.cc                    |   1 +
 be/src/util/simple-logger.h                     |   2 +-
 be/src/util/streaming-sampler.h                 |   2 +
 be/src/util/tuple-row-compare.cc                |   1 +
 be/src/util/tuple-row-compare.h                 |   3 +-
 156 files changed, 1415 insertions(+), 1139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index fa5a09c..b03d31b 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -15,6 +15,7 @@
 #include <iostream>
 #include <sstream>
 
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/codegen/codegen-anyval.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.h b/be/src/codegen/codegen-anyval.h
index cafe522..10e2150 100644
--- a/be/src/codegen/codegen-anyval.h
+++ b/be/src/codegen/codegen-anyval.h
@@ -16,6 +16,8 @@
 #define IMPALA_CODEGEN_CODEGEN_ANYVAL_H
 
 #include "codegen/llvm-codegen.h"
+#include "runtime/descriptors.h"
+#include "runtime/multi-precision.h"
 
 namespace llvm {
 class Type;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index 6167501..baf9921 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -19,6 +19,8 @@
 #include "testutil/gtest-util.h"
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/string-value.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 #include "util/path-builder.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index b7f0601..2bdbbcc 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -55,9 +55,13 @@
 #include "codegen/mcjit-mem-mgr.h"
 #include "impala-ir/impala-ir-names.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/mem-pool.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
 #include "util/cpu-info.h"
 #include "util/hdfs-util.h"
 #include "util/path-builder.h"
+#include "util/runtime-profile-counters.h"
 #include "util/test-info.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 29631d0..f88bf50 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -19,9 +19,6 @@
 #include <string>
 #include <vector>
 
-#include <boost/lexical_cast.hpp>
-
-#include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
 #include "gen-cpp/Status_types.h"  // for TStatus

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 509f961..95db201 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -38,7 +38,7 @@
 #include "runtime/tuple-row.h"
 #include "udf/udf-internal.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/Exprs_types.h"
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
index 95b0482..24ce37b 100644
--- a/be/src/exec/aggregation-node.h
+++ b/be/src/exec/aggregation-node.h
@@ -16,7 +16,6 @@
 #ifndef IMPALA_EXEC_AGGREGATION_NODE_H
 #define IMPALA_EXEC_AGGREGATION_NODE_H
 
-#include <functional>
 #include <boost/scoped_ptr.hpp>
 
 #include "exec/exec-node.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 93960c9..d9fa6c3 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -22,6 +22,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "udf/udf-internal.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index dadab51..4e0ac59 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -21,6 +21,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/string-search.h"
 #include "util/codec.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index c7d5e4f..eb9704b 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -19,8 +19,10 @@
 #include "exprs/expr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/cgroups-mgr.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 77a2e82..42ef2e0 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -22,6 +22,7 @@
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/string-parser.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "gen-cpp/CatalogObjects_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 0ed6ab0..dc3a17c 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -27,6 +27,7 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "runtime/data-stream-sender.h"
+#include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 1c1aae5..76e86e9 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -21,13 +21,14 @@
 #include "exec/read-write-util.h"
 #include "exprs/expr.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index b2c3f4e..395bc0e 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -21,7 +21,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 #include "gen-cpp/PlanNodes_types.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index fdde613..b0f07d0 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -50,7 +50,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/filter-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index e65f0ac..b4cd290 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -14,6 +14,8 @@
 
 #include "exec/filter-context.h"
 
+#include "util/runtime-profile-counters.h"
+
 using namespace impala;
 using namespace strings;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 7163c88..9a66d36 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -23,10 +23,13 @@
 #include "exprs/expr.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
 #include "util/debug-util.h"
 #include "util/bloom-filter.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index d6cd196..3a0b0b1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -30,9 +30,10 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "runtime/test-env.h"
+#include "runtime/tuple-row.h"
 #include "service/fe-support.h"
 #include "util/cpu-info.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/test-info.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 55dd120..5b466dc 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -25,7 +25,6 @@
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/buffered-tuple-stream.h"
 #include "runtime/buffered-tuple-stream.inline.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/tuple-row.h"
 #include "util/bitmap.h"
 #include "util/hash-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index f628c0d..ab649d6 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -16,6 +16,7 @@
 
 #include <algorithm>
 
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
@@ -23,7 +24,7 @@
 #include "runtime/tuple.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/PlanNodes_types.h"
 #include "exec/text-converter.inline.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hbase-table-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc
index 3b25429..5c6e45b 100644
--- a/be/src/exec/hbase-table-scanner.cc
+++ b/be/src/exec/hbase-table-scanner.cc
@@ -23,7 +23,9 @@
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/tuple.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 8ddccb7..b1afcd0 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -20,6 +20,8 @@
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+#include "runtime/mem-tracker.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hbase-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc
index eeb3fef..5e047e1 100644
--- a/be/src/exec/hbase-table-writer.cc
+++ b/be/src/exec/hbase-table-writer.cc
@@ -18,11 +18,15 @@
 #include <sstream>
 
 #include "common/logging.h"
+#include "exprs/expr.h"
 #include "exprs/expr-context.h"
 #include "runtime/hbase-table-factory.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple-row.h"
 #include "util/bit-util.h"
 #include "util/jni-util.h"
-#include "exprs/expr.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hbase-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.h b/be/src/exec/hbase-table-writer.h
index 2bb6f15..2bb0df0 100644
--- a/be/src/exec/hbase-table-writer.h
+++ b/be/src/exec/hbase-table-writer.h
@@ -25,11 +25,12 @@
 #include "common/status.h"
 #include "runtime/runtime-state.h"
 #include "runtime/descriptors.h"
-#include "runtime/row-batch.h"
 #include "runtime/hbase-table.h"
 
 namespace impala {
 
+class RowBatch;
+
 /// Class to write RowBatches to an HBase table using the java HTable client.
 /// This class should only be called from a single sink and should not be
 /// shared.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index f3ebb97..84ca502 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -16,6 +16,7 @@
 
 #include "exec/hdfs-avro-scanner.h"
 #include "exec/read-write-util.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/string-value.inline.h"
 
 using namespace impala;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 5f469b7..ec8ec26 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -26,7 +26,7 @@
 #include "runtime/runtime-state.h"
 #include "util/codec.h"
 #include "util/decompress.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index 3a7e001..9ee704d 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -26,9 +26,13 @@
 #include "util/uid-util.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "util/runtime-profile-counters.h"
 #include "write-stream.inline.h"
 
 #include "common/names.h"
@@ -163,6 +167,10 @@ Status HdfsAvroTableWriter::Init() {
   return Status::OK();
 }
 
+void HdfsAvroTableWriter::Close() {
+  mem_pool_->FreeAll();
+}
+
 Status HdfsAvroTableWriter::AppendRowBatch(RowBatch* batch,
     const vector<int32_t>& row_group_indices, bool* new_file) {
   int32_t limit;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index 2cb5138..e8315f7 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -66,7 +66,7 @@ class HdfsAvroTableWriter : public HdfsTableWriter {
   virtual Status Init();
   virtual Status Finalize() { return Flush(); }
   virtual Status InitNewFile() { return WriteFileHeader(); }
-  virtual void Close() { mem_pool_->FreeAll(); }
+  virtual void Close();
   virtual uint64_t default_block_size() const { return 0; }
   virtual std::string file_extension() const { return "avro"; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index d9ce76f..4bb7bac 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -44,7 +44,7 @@
 #include "util/error-util.h"
 #include "util/dict-encoding.h"
 #include "util/rle-encoding.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "rpc/thrift-util.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 0700cbe..5a12602 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -18,6 +18,7 @@
 
 #include "exec/hdfs-scanner.h"
 #include "exec/parquet-common.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 3743503..6193da7 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -18,6 +18,8 @@
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
 #include "runtime/decimal-value.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index ca24394..d85ad49 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -29,6 +29,7 @@
 #include "runtime/string-value.h"
 #include "util/codec.h"
 #include "util/string-parser.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 4511867..cb4f2f7 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -39,8 +39,11 @@
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
+#include "runtime/string-buffer.h"
+#include "scheduling/query-resource-mgr.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
@@ -49,7 +52,7 @@
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 93253a2..f0eb8e3 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -25,14 +25,12 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
 
 #include "exec/filter-context.h"
 #include "exec/scan-node.h"
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
-#include "runtime/string-buffer.h"
 #include "util/avro-util.h"
 #include "util/counting-barrier.h"
 #include "util/progress-updater.h"
@@ -164,7 +162,7 @@ class HdfsScanNode : public ScanNode {
 
   int skip_header_line_count() const { return skip_header_line_count_; }
 
-  DiskIoMgr::RequestContext* reader_context() { return reader_context_; }
+  DiskIoRequestContext* reader_context() { return reader_context_; }
 
   typedef std::map<TupleId, std::vector<ExprContext*> > ConjunctsMap;
   const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
@@ -325,7 +323,7 @@ class HdfsScanNode : public ScanNode {
   const int tuple_id_;
 
   /// RequestContext object to use with the disk-io-mgr for reads.
-  DiskIoMgr::RequestContext* reader_context_;
+  DiskIoRequestContext* reader_context_;
 
   /// Descriptor for tuples this scan node constructs
   const TupleDescriptor* tuple_desc_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 6e9eb61..74a5efb 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -38,7 +38,7 @@
 #include "util/bitmap.h"
 #include "util/codec.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/sse-util.h"
 #include "util/string-parser.h"
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 168cdd1..1460b1a 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -24,6 +24,7 @@
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
 #include "util/codec.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc
index dd8f986..19f3202 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -19,9 +19,12 @@
 #include "util/uid-util.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "util/runtime-profile-counters.h"
 
 #include <vector>
 #include <hdfs.h>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index 4a55d5e..e75c305 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -15,6 +15,7 @@
 #include "exec/hdfs-table-writer.h"
 
 #include "common/names.h"
+#include "runtime/mem-tracker.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index f3b669d..6e501cc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -24,6 +24,7 @@
 #include "exec/text-converter.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
 #include "util/codec.h"
 #include "util/decompress.h"
 #include "util/cpu-info.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 03b8343..997637d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -18,6 +18,7 @@
 
 #include "exec/hdfs-scanner.h"
 #include "runtime/string-buffer.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/hdfs-text-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc
index 9936237..37ccd3e 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -17,6 +17,7 @@
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -24,6 +25,7 @@
 #include "util/codec.h"
 #include "util/compress.h"
 #include "util/hdfs-util.h"
+#include "util/runtime-profile-counters.h"
 
 #include <hdfs.h>
 #include <stdlib.h>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index b7453ee..7082991 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -24,7 +24,9 @@
 #include "gen-cpp/Types_types.h"
 #include "gutil/strings/split.h"
 #include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
 #include "runtime/runtime-state.h"
 #include "service/fe-support.h"
 #include "testutil/desc-tbl-builder.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 3d12f4f..a08034f 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -24,18 +24,19 @@
 #include "exec/kudu-scanner.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
+#include "gutil/gscoped_ptr.h"
+#include "gutil/strings/substitute.h"
+#include "gutil/stl_util.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
-#include "gutil/gscoped_ptr.h"
-#include "gutil/strings/substitute.h"
-#include "gutil/stl_util.h"
+#include "scheduling/query-resource-mgr.h"
 #include "util/disk-info.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index db98f8d..ac844c7 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -22,6 +22,7 @@
 #include "exprs/expr-context.h"
 #include "exec/kudu-util.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/string-value.h"
@@ -30,7 +31,7 @@
 #include "gutil/strings/substitute.h"
 #include "util/jni-util.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-table-sink-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink-test.cc b/be/src/exec/kudu-table-sink-test.cc
index 1d5c642..f7919dc 100644
--- a/be/src/exec/kudu-table-sink-test.cc
+++ b/be/src/exec/kudu-table-sink-test.cc
@@ -15,7 +15,6 @@
 #include "exec/kudu-testutil.h"
 
 #include "common/init.h"
-#include "common/object-pool.h"
 #include "codegen/llvm-codegen.h"
 #include "exec/kudu-table-sink.h"
 #include "exec/kudu-util.h"
@@ -26,8 +25,10 @@
 #include "gutil/stl_util.h"
 #include "kudu/client/row_result.h"
 #include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
 #include "service/fe-support.h"
 #include "testutil/desc-tbl-builder.h"
 #include "testutil/test-macros.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index c3b91b7..055292e 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -22,6 +22,9 @@
 #include "exprs/expr-context.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gutil/gscoped_ptr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/kudu-testutil.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-testutil.h b/be/src/exec/kudu-testutil.h
index 6849e74..083a2c7 100644
--- a/be/src/exec/kudu-testutil.h
+++ b/be/src/exec/kudu-testutil.h
@@ -24,6 +24,7 @@
 #include <tr1/memory>
 #include <vector>
 
+#include "common/object-pool.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/exec-env.h"
 #include "testutil/desc-tbl-builder.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index a12957e..149e8d3 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -20,11 +20,12 @@
 #include "exec/row-batch-cache.h"
 #include "exprs/expr.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/bitmap.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/PlanNodes_types.h"
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/old-hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-test.cc b/be/src/exec/old-hash-table-test.cc
index 0118f1d..eeccd76 100644
--- a/be/src/exec/old-hash-table-test.cc
+++ b/be/src/exec/old-hash-table-test.cc
@@ -26,9 +26,9 @@
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
-#include "runtime/mem-tracker.h"
+#include "runtime/tuple-row.h"
 #include "util/cpu-info.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/old-hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc
index 4d419d2..b2109c0 100644
--- a/be/src/exec/old-hash-table.cc
+++ b/be/src/exec/old-hash-table.cc
@@ -24,9 +24,13 @@
 #include "exprs/slot-ref.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
+#include "runtime/tuple-row.h"
 #include "util/bloom-filter.h"
+#include "runtime/tuple.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
 #include "util/impalad-metrics.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/old-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.h b/be/src/exec/old-hash-table.h
index 0645b8d..ad28512 100644
--- a/be/src/exec/old-hash-table.h
+++ b/be/src/exec/old-hash-table.h
@@ -18,6 +18,7 @@
 
 #include <vector>
 #include <boost/cstdint.hpp>
+#include <boost/scoped_ptr.hpp>
 #include "codegen/impala-ir.h"
 #include "common/logging.h"
 #include "runtime/mem-pool.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 51478fc..33ddfc4 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -32,6 +32,7 @@
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -40,7 +41,7 @@
 #include "runtime/tuple-row.h"
 #include "udf/udf-internal.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/Exprs_types.h"
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 300b9e0..b2f8587 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -16,7 +16,6 @@
 #ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
 #define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
 
-#include <functional>
 #include <boost/scoped_ptr.hpp>
 
 #include "exec/exec-node.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index e45cbb6..640c674 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -26,12 +26,14 @@
 #include "exprs/slot-ref.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-state.h"
 #include "util/bloom-filter.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/row-batch-list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/row-batch-list-test.cc b/be/src/exec/row-batch-list-test.cc
index 5a634f6..c834255 100644
--- a/be/src/exec/row-batch-list-test.cc
+++ b/be/src/exec/row-batch-list-test.cc
@@ -26,7 +26,7 @@
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/cpu-info.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "testutil/desc-tbl-builder.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 8527ee8..6e58a72 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -16,6 +16,8 @@
 
 #include <boost/bind.hpp>
 
+#include "util/runtime-profile-counters.h"
+
 #include "common/names.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 897f8c5..833436c 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -22,6 +22,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 22e0928..e1dee97 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -23,7 +23,6 @@
 #include "common/status.h"
 #include "exec/filter-context.h"
 #include "runtime/disk-io-mgr.h"
-#include "runtime/row-batch.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/select-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index 97407f8..6c95268 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -16,6 +16,8 @@
 #include "exprs/expr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/raw-value.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 2ce0245..3ceaf00 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -17,6 +17,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/subplan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 3b946a9..f52a844 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -18,6 +18,7 @@
 #include "exec/unnest-node.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/text-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc
index 8e71ef4..5768231 100644
--- a/be/src/exec/text-converter.cc
+++ b/be/src/exec/text-converter.cc
@@ -23,6 +23,7 @@
 #include "runtime/tuple.h"
 #include "text-converter.h"
 #include "util/string-parser.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index a73c3b5..a2b8e06 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -25,7 +25,7 @@
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/Exprs_types.h"
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 6694f12..42d9b97 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -18,6 +18,8 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/raw-value.h"
+#include "runtime/tuple-row.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/PlanNodes_types.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exec/unnest-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index e31c938..68846b8 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -17,8 +17,10 @@
 #include "exec/subplan-node.h"
 #include "exprs/expr-context.h"
 #include "exprs/slot-ref.h"
+#include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "util/runtime-profile.h"
+#include "runtime/tuple-row.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/experiments/data-provider.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/data-provider.cc b/be/src/experiments/data-provider.cc
index f17b72a..fa98b80 100644
--- a/be/src/experiments/data-provider.cc
+++ b/be/src/experiments/data-provider.cc
@@ -5,6 +5,8 @@
 #include <math.h>
 #include <iostream>
 
+#include "util/runtime-profile-counters.h"
+
 #include "common/names.h"
 
 using boost::minstd_rand;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/experiments/tuple-splitter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc
index c458c66..efc6ca9 100644
--- a/be/src/experiments/tuple-splitter-test.cc
+++ b/be/src/experiments/tuple-splitter-test.cc
@@ -24,7 +24,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "util/cpu-info.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #define PRINT_RESULT 0
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index e9598ea..77ecd47 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -200,7 +200,7 @@ class AggFnEvaluator {
   impala_udf::AnyVal* staging_merge_input_val_;
 
   /// Cache entry for the library containing the function ptrs.
-  LibCache::LibCacheEntry* cache_entry_;
+  LibCacheEntry* cache_entry_;
 
   /// Function ptrs for the different phases of the aggregate function.
   void* init_fn_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/compound-predicates.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/compound-predicates.cc b/be/src/exprs/compound-predicates.cc
index ce78f1f..3ee7f8a 100644
--- a/be/src/exprs/compound-predicates.cc
+++ b/be/src/exprs/compound-predicates.cc
@@ -12,11 +12,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <sstream>
+
 #include "exprs/compound-predicates.h"
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
 #include "runtime/runtime-state.h"
 
+#include "common/names.h"
+
 using namespace impala;
 using namespace llvm;
 
@@ -33,6 +37,12 @@ BooleanVal AndPredicate::GetBooleanVal(ExprContext* context, TupleRow* row) {
   return BooleanVal(true);
 }
 
+string AndPredicate::DebugString() const {
+  stringstream out;
+  out << "AndPredicate(" << Expr::DebugString() << ")";
+  return out.str();
+}
+
 // (<> || true) is true, (false || NULL) is NULL
 BooleanVal OrPredicate::GetBooleanVal(ExprContext* context, TupleRow* row) {
   DCHECK_EQ(children_.size(), 2);
@@ -46,6 +56,12 @@ BooleanVal OrPredicate::GetBooleanVal(ExprContext* context, TupleRow* row) {
   return BooleanVal(false);
 }
 
+string OrPredicate::DebugString() const {
+  stringstream out;
+  out << "OrPredicate(" << Expr::DebugString() << ")";
+  return out.str();
+}
+
 // IR codegen for compound and/or predicates.  Compound predicate has non trivial 
 // null handling as well as many branches so this is pretty complicated.  The IR 
 // for x && y is:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/compound-predicates.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/compound-predicates.h b/be/src/exprs/compound-predicates.h
index df4fe67..b391831 100644
--- a/be/src/exprs/compound-predicates.h
+++ b/be/src/exprs/compound-predicates.h
@@ -47,11 +47,7 @@ class AndPredicate: public CompoundPredicate {
   friend class Expr;
   AndPredicate(const TExprNode& node) : CompoundPredicate(node) { }
 
-  virtual std::string DebugString() const {
-    std::stringstream out;
-    out << "AndPredicate(" << Expr::DebugString() << ")";
-    return out.str();
-  }
+  virtual std::string DebugString() const;
 
  private:
   friend class OpcodeRegistry;
@@ -70,11 +66,7 @@ class OrPredicate: public CompoundPredicate {
   friend class Expr;
   OrPredicate(const TExprNode& node) : CompoundPredicate(node) { }
 
-  virtual std::string DebugString() const {
-    std::stringstream out;
-    out << "OrPredicate(" << Expr::DebugString() << ")";
-    return out.str();
-  }
+  virtual std::string DebugString() const;
 
  private:
   friend class OpcodeRegistry;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-benchmark.cc b/be/src/exprs/expr-benchmark.cc
index 5439e4e..91f89b5 100644
--- a/be/src/exprs/expr-benchmark.cc
+++ b/be/src/exprs/expr-benchmark.cc
@@ -26,6 +26,7 @@
 #include "rpc/thrift-server.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "runtime/mem-tracker.h"
 #include "service/impala-server.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc
index ec81b30..c660bfd 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -47,6 +47,7 @@ IntVal TestGetConstant(
 #include "common/init.h"
 #include "exprs/expr-context.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "service/fe-support.h"
 #include "udf/udf-internal.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 2a01bfa..4420394 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -16,6 +16,7 @@
 
 #include <sstream>
 
+#include "common/object-pool.h"
 #include "exprs/expr.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/mem-pool.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 09f60cb..ed60016 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -39,6 +39,7 @@
 #include "gen-cpp/hive_metastore_types.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-server.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-value.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.cc b/be/src/exprs/expr.cc
index 29c8c91..0156edc 100644
--- a/be/src/exprs/expr.cc
+++ b/be/src/exprs/expr.cc
@@ -57,6 +57,9 @@
 #include "gen-cpp/Data_types.h"
 #include "runtime/lib-cache.h"
 #include "runtime/runtime-state.h"
+#include "runtime/raw-value.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple-row.h"
 #include "udf/udf.h"
 #include "udf/udf-internal.h"
 
@@ -728,4 +731,10 @@ Status Expr::GetFnContextError(ExprContext* ctx) {
   return Status::OK();
 }
 
+string Expr::DebugString(const string& expr_name) const {
+  stringstream out;
+  out << expr_name << "(" << Expr::DebugString() << ")";
+  return out.str();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index e7e12b2..e0c55b6 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -85,18 +85,14 @@
 #ifndef IMPALA_EXPRS_EXPR_H
 #define IMPALA_EXPRS_EXPR_H
 
+#include <boost/scoped_ptr.hpp>
 #include <string>
 #include <vector>
 
+#include "common/global-types.h"
 #include "common/status.h"
 #include "impala-ir/impala-ir-functions.h"
-#include "runtime/descriptors.h"
-#include "runtime/decimal-value.h"
-#include "runtime/lib-cache.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple-row.h"
-#include "runtime/string-value.h"
-#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
 #include "udf/udf.h"
 #include "udf/udf-internal.h" // for CollectionVal
 
@@ -111,15 +107,19 @@ namespace llvm {
 
 namespace impala {
 
-class Expr;
+class ExprContext;
 class IsNullExpr;
+class LibCacheEntry;
 class LlvmCodeGen;
+class MemTracker;
 class ObjectPool;
 class RowDescriptor;
 class RuntimeState;
 class TColumnValue;
 class TExpr;
 class TExprNode;
+class Tuple;
+class TupleRow;
 
 /// This is the superclass of all expr evaluation nodes.
 class Expr {
@@ -343,7 +343,7 @@ class Expr {
       FunctionContext::FunctionStateScope scope = FunctionContext::FRAGMENT_LOCAL);
 
   /// Cache entry for the library implementing this function.
-  LibCache::LibCacheEntry* cache_entry_;
+  LibCacheEntry* cache_entry_;
 
   /// Function description.
   TFunction fn_;
@@ -399,11 +399,7 @@ class Expr {
   int InlineConstants(LlvmCodeGen* codegen, llvm::Function* fn);
 
   /// Simple debug string that provides no expr subclass-specific information
-  std::string DebugString(const std::string& expr_name) const {
-    std::stringstream out;
-    out << expr_name << "(" << Expr::DebugString() << ")";
-    return out.str();
-  }
+  std::string DebugString(const std::string& expr_name) const;
 
  private:
   friend class ExprContext;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/in-predicate-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/in-predicate-ir.cc b/be/src/exprs/in-predicate-ir.cc
index 61d39ef..80883c7 100644
--- a/be/src/exprs/in-predicate-ir.cc
+++ b/be/src/exprs/in-predicate-ir.cc
@@ -17,6 +17,7 @@
 #include "exprs/in-predicate.h"
 
 #include "exprs/anyval-util.h"
+#include "runtime/decimal-value.h"
 #include "runtime/string-value.inline.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/math-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/math-functions-ir.cc b/be/src/exprs/math-functions-ir.cc
index ce08ba8..344db59 100644
--- a/be/src/exprs/math-functions-ir.cc
+++ b/be/src/exprs/math-functions-ir.cc
@@ -27,10 +27,6 @@
 
 #include "common/names.h"
 
-using boost::algorithm::is_any_of;
-using boost::algorithm::join;
-using boost::algorithm::split;
-using boost::algorithm::to_lower;
 using std::uppercase;
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/slot-ref.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index b075fb2..59c2b78 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -20,8 +20,12 @@
 #include "codegen/llvm-codegen.h"
 #include "gen-cpp/Exprs_types.h"
 #include "runtime/collection-value.h"
+#include "runtime/decimal-value.h"
+#include "runtime/multi-precision.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-value.inline.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/tuple-row.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/slot-ref.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index 4708df5..a00adbf 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -16,6 +16,7 @@
 #define IMPALA_EXPRS_SLOTREF_H
 
 #include "exprs/expr.h"
+#include "runtime/descriptors.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/exprs/tuple-is-null-predicate.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/tuple-is-null-predicate.cc b/be/src/exprs/tuple-is-null-predicate.cc
index ff20471..ba96138 100644
--- a/be/src/exprs/tuple-is-null-predicate.cc
+++ b/be/src/exprs/tuple-is-null-predicate.cc
@@ -19,6 +19,8 @@
 #include "gen-cpp/Exprs_types.h"
 
 #include "common/names.h"
+#include "runtime/descriptors.h"
+#include "runtime/tuple-row.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6c15330..0498125 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -48,6 +48,7 @@ add_library(Runtime
   raw-value-ir.cc
   row-batch.cc
   runtime-filter.cc
+  runtime-filter-bank.cc
   runtime-state.cc
   sorted-run-merger.cc
   sorter.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 28e8348..d0783ea 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -16,7 +16,7 @@
 #define IMPALA_BACKEND_CLIENT_H
 
 #include "runtime/client-cache.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/ImpalaInternalService.h"
 
@@ -63,9 +63,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
   RuntimeProfile::ConcurrentTimerCounter* transmit_csw_;
 };
 
-class ImpalaBackendClient;
-typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache;
-typedef ClientConnection<ImpalaBackendClient> ImpalaBackendConnection;
 }
 
 #endif // IMPALA_BACKEND_CLIENT_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index b11054c..8265ea2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -17,7 +17,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/impalad-metrics.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index 3efa634..9e54c4f 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -592,7 +592,7 @@ class BufferedBlockMgr {
 
   /// DiskIoMgr handles to read and write blocks.
   DiskIoMgr* io_mgr_;
-  DiskIoMgr::RequestContext* io_request_context_;
+  DiskIoRequestContext* io_request_context_;
 
   /// If true, a disk write failed and all API calls return.
   /// Status::CANCELLED. Set to true if there was an error writing a block, or if

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index eb35a69..96e2d0c 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -24,6 +24,7 @@
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"
 #include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/client-cache-types.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache-types.h b/be/src/runtime/client-cache-types.h
new file mode 100644
index 0000000..70a82bf
--- /dev/null
+++ b/be/src/runtime/client-cache-types.h
@@ -0,0 +1,42 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_RUNTIME_CLIENT_CACHE_DECLS_H
+#define IMPALA_RUNTIME_CLIENT_CACHE_DECLS_H
+
+namespace impala {
+
+/// Forward declarations for client cache types to avoid including the full class
+/// declaration indirectly in many places where it is not needed.
+template<class T>
+class ClientCache;
+
+template<class T>
+class ClientConnection;
+
+/// Common cache / connection types
+class ImpalaInternalServiceClient;
+typedef ClientCache<ImpalaInternalServiceClient> ImpalaInternalServiceClientCache;
+typedef ClientConnection<ImpalaInternalServiceClient> ImpalaInternalServiceConnection;
+
+class CatalogServiceClient;
+typedef ClientCache<CatalogServiceClient> CatalogServiceClientCache;
+typedef ClientConnection<CatalogServiceClient> CatalogServiceConnection;
+
+class ImpalaBackendClient;
+typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache;
+typedef ClientConnection<ImpalaBackendClient> ImpalaBackendConnection;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 7ca24a2..ff38286 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -22,6 +22,7 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/bind.hpp>
 
+#include "runtime/client-cache-types.h"
 #include "util/metrics.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-util.h"
@@ -183,9 +184,6 @@ class ClientCacheHelper {
       ClientKey* client_key);
 };
 
-template<class T>
-class ClientCache;
-
 /// A scoped client connection to help manage clients from a client cache. Clients of this
 /// class should use DoRpc() to actually make RPC calls.
 template<class T>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/collection-value-builder-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder-test.cc b/be/src/runtime/collection-value-builder-test.cc
index e86737b..b3dfc55 100644
--- a/be/src/runtime/collection-value-builder-test.cc
+++ b/be/src/runtime/collection-value-builder-test.cc
@@ -15,6 +15,7 @@
 #include <gtest/gtest.h>
 
 #include "runtime/collection-value-builder.h"
+#include "runtime/mem-tracker.h"
 #include "testutil/desc-tbl-builder.h"
 #include "testutil/gtest-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index fd27a51..7cb6c97 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -43,10 +43,12 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/plan-fragment-executor.h"
 #include "runtime/row-batch.h"
 #include "runtime/backend-client.h"
 #include "runtime/parallel-executor.h"
+#include "runtime/tuple-row.h"
 #include "scheduling/scheduler.h"
 #include "exec/data-sink.h"
 #include "exec/scan-node.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 048523c..578c61d 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -28,10 +28,10 @@
 #include <boost/accumulators/statistics/variance.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
-#include <boost/thread/thread.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 
+#include "common/hdfs.h"
 #include "common/status.h"
 #include "common/global-types.h"
 #include "util/progress-updater.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index ae41a36..b366a70 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -25,6 +25,7 @@
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
 #include "gen-cpp/ImpalaInternalService.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 05da622..21f8336 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -27,7 +27,7 @@
 #include "common/status.h"
 #include "common/object-pool.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
-#include "runtime/mem-tracker.h"
+#include "util/metrics.h"
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/Types_types.h"  // for TUniqueId

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 516aef6..10b0803 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -17,9 +17,10 @@
 
 #include "runtime/data-stream-recvr.h"
 #include "runtime/data-stream-mgr.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/sorted-run-merger.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 79f7d64..0a10963 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -31,6 +31,7 @@
 #include "runtime/backend-client.h"
 #include "util/debug-util.h"
 #include "util/network-util.h"
+#include "util/thread-pool.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h
index 7a93933..6f66aa7 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -29,6 +29,7 @@
 namespace impala {
 
 class Expr;
+class ExprContext;
 class RowBatch;
 class RowDescriptor;
 class MemTracker;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 2341c17..54b38b1 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -29,8 +29,9 @@
 #include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
-#include "runtime/raw-value.inline.h"
 #include "runtime/backend-client.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.inline.h"
 #include "service/fe-support.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 18d835c..3c67c7e 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -16,7 +16,6 @@
 #ifndef IMPALA_RUNTIME_DESCRIPTORS_H
 #define IMPALA_RUNTIME_DESCRIPTORS_H
 
-#include <vector>
 #include <tr1/unordered_map>
 #include <vector>
 #include <boost/scoped_ptr.hpp>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index e58a3f3..6994cfe 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -22,6 +22,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "runtime/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/cpu-info.h"
@@ -50,10 +51,10 @@ struct DiskIoMgr::DiskQueue {
   boost::condition_variable work_available;
 
   /// list of all request contexts that have work queued on this disk
-  std::list<RequestContext*> request_contexts;
+  std::list<DiskIoRequestContext*> request_contexts;
 
   /// Enqueue the request context to the disk queue.  The DiskQueue lock must not be taken.
-  inline void EnqueueContext(RequestContext* worker) {
+  inline void EnqueueContext(DiskIoRequestContext* worker) {
     {
       boost::unique_lock<boost::mutex> disk_lock(lock);
       /// Check that the reader is not already on the queue
@@ -75,14 +76,14 @@ struct DiskIoMgr::DiskQueue {
 /// A scan range for the reader is on one of five states:
 /// 1) PerDiskState's unstarted_ranges: This range has only been queued
 ///    and nothing has been read from it.
-/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started.
+/// 2) DiskIoRequestContext's ready_to_start_ranges_: This range is about to be started.
 ///    As soon as the reader picks it up, it will move to the in_flight_ranges
 ///    queue.
 /// 3) PerDiskState's in_flight_ranges: This range is being processed and will
 ///    be read from the next time a disk thread picks it up in GetNextRequestRange()
 /// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
 ///    anymore. We need the caller to pull a buffer off which will put this in
-///    the in_flight_ranges queue. These ranges are in the RequestContext's
+///    the in_flight_ranges queue. These ranges are in the DiskIoRequestContext's
 ///    blocked_ranges_ queue.
 /// 5) ScanRange is cached and in the cached_ranges_ queue.
 //
@@ -111,15 +112,21 @@ struct DiskIoMgr::DiskQueue {
 /// the entire range is written when the write request is handled. (In other words, writes
 /// are not broken up.)
 //
-/// When a RequestContext is processed by a disk thread in GetNextRequestRange(), a write
-/// range is always removed from the list of unstarted write ranges and appended to the
-/// in_flight_ranges_ queue. This is done to alternate reads and writes - a read that is
-/// scheduled (by calling GetNextRange()) is always followed by a write (if one exists).
-/// And since at most one WriteRange can be present in in_flight_ranges_ at any time
-/// (once a write range is returned from GetNetxRequestRange() it is completed and not
-/// re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
+/// When a DiskIoRequestContext is processed by a disk thread in GetNextRequestRange(),
+/// a write range is always removed from the list of unstarted write ranges and appended
+/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
+/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
+/// exists).  And since at most one WriteRange can be present in in_flight_ranges_ at any
+/// time (once a write range is returned from GetNetxRequestRange() it is completed an
+/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
 /// behind at most one write range.
-class DiskIoMgr::RequestContext {
+class DiskIoRequestContext {
+  using DiskQueue = DiskIoMgr::DiskQueue;
+  using RequestRange = DiskIoMgr::RequestRange;
+  using ScanRange = DiskIoMgr::ScanRange;
+  using WriteRange = DiskIoMgr::WriteRange;
+  using RequestType = DiskIoMgr::RequestType;
+
  public:
   enum State {
     /// Reader is initialized and maps to a client
@@ -135,7 +142,7 @@ class DiskIoMgr::RequestContext {
     Inactive,
   };
 
-  RequestContext(DiskIoMgr* parent, int num_disks);
+  DiskIoRequestContext(DiskIoMgr* parent, int num_disks);
 
   /// Resets this object.
   void Reset(MemTracker* tracker);
@@ -162,10 +169,10 @@ class DiskIoMgr::RequestContext {
   /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
   /// if necessary.
   /// Reader lock must be taken before this.
-  void ScheduleScanRange(DiskIoMgr::ScanRange* range) {
+  void ScheduleScanRange(ScanRange* range) {
     DCHECK_EQ(state_, Active);
     DCHECK(range != NULL);
-    RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+    DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
     state.in_flight_ranges()->Enqueue(range);
     state.ScheduleContext(this, range->disk_id());
   }
@@ -175,7 +182,7 @@ class DiskIoMgr::RequestContext {
 
   /// Adds request range to disk queue for this request context. Currently,
   /// schedule_immediately must be false is RequestRange is a write range.
-  void AddRequestRange(DiskIoMgr::RequestRange* range, bool schedule_immediately);
+  void AddRequestRange(RequestRange* range, bool schedule_immediately);
 
   /// Returns the default queue capacity for scan ranges. This is updated
   /// as the reader processes ranges.
@@ -338,11 +345,15 @@ class DiskIoMgr::RequestContext {
       return &in_flight_ranges_;
     }
 
-    InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; }
+    InternalQueue<ScanRange>* unstarted_scan_ranges() {
+      return &unstarted_scan_ranges_;
+    }
     InternalQueue<WriteRange>* unstarted_write_ranges() {
       return &unstarted_write_ranges_;
     }
-    InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
+    InternalQueue<RequestRange>* in_flight_ranges() {
+      return &in_flight_ranges_;
+    }
 
     PerDiskState() {
       Reset();
@@ -350,7 +361,7 @@ class DiskIoMgr::RequestContext {
 
     /// Schedules the request context on this disk if it's not already on the queue.
     /// Context lock must be taken before this.
-    void ScheduleContext(RequestContext* context, int disk_id) {
+    void ScheduleContext(DiskIoRequestContext* context, int disk_id) {
       if (!is_on_queue_ && !done_) {
         is_on_queue_ = true;
         context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
@@ -371,7 +382,7 @@ class DiskIoMgr::RequestContext {
 
     /// Decrement request thread count and do final cleanup if this is the last
     /// thread. RequestContext lock must be taken before this.
-    void DecrementRequestThreadAndCheckDone(RequestContext* context) {
+    void DecrementRequestThreadAndCheckDone(DiskIoRequestContext* context) {
       num_threads_in_op_.Add(-1); // Also acts as a barrier.
       if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
         // This thread is the last one for this reader on this disk, do final cleanup
@@ -416,7 +427,7 @@ class DiskIoMgr::RequestContext {
     /// For each disks, the number of request ranges that have not been fully read.
     /// In the non-cancellation path, this will hit 0, and done will be set to true
     /// by the disk thread. This is undefined in the cancellation path (the various
-    /// threads notice by looking at the RequestContext's state_).
+    /// threads notice by looking at the DiskIoRequestContext's state_).
     int num_remaining_ranges_;
 
     /// Queue of ranges that have not started being read.  This list is exclusive



[2/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv dependencies

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8c423c0..c970163 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include "runtime/runtime-state.h"
+
 #include <iostream>
 #include <jni.h>
 #include <sstream>
@@ -27,11 +29,11 @@
 #include "exprs/expr.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/descriptors.h"
-#include "runtime/runtime-state.h"
-#include "runtime/timestamp-value.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
-#include "runtime/runtime-filter.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/timestamp-value.h"
 #include "util/bitmap.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -75,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
     is_cancelled_(false),
     query_resource_mgr_(NULL),
     root_node_id_(-1),
-    filter_bank_(fragment_ctx().query_ctx, this) {
+    filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) {
   Status status = Init(exec_env);
   DCHECK(status.ok()) << status.GetDetail();
 }
@@ -90,7 +92,7 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
     is_cancelled_(false),
     query_resource_mgr_(NULL),
     root_node_id_(-1),
-    filter_bank_(query_ctx, this) {
+    filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
   fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx);
   fragment_params_.fragment_instance_ctx.query_ctx.request.query_options
       .__set_batch_size(DEFAULT_BATCH_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 860692f..15c8d9c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -16,40 +16,34 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
 #define IMPALA_RUNTIME_RUNTIME_STATE_H
 
-/// needed for scoped_ptr to work on ObjectPool
-#include "common/object-pool.h"
-
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <vector>
 #include <string>
-/// stringstream is a typedef, so can't forward declare it.
-#include <sstream>
 
-#include "scheduling/query-resource-mgr.h"
+// NOTE: try not to add more headers here: runtime-state.h is included in many many files.
+#include "common/global-types.h"  // for PlanNodeId
+#include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "runtime/disk-io-mgr.h"  // for DiskIoMgr::RequestContext
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-filter.h"
 #include "runtime/thread-resource-mgr.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "gen-cpp/Types_types.h"  // for TUniqueId
-#include "gen-cpp/ImpalaInternalService_types.h"  // for TQueryOptions
-#include "util/auth-util.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/runtime-profile.h"
 
 namespace impala {
 
 class BufferedBlockMgr;
+class DataStreamRecvr;
 class DescriptorTbl;
-class ObjectPool;
-class Status;
-class ExecEnv;
+class DiskIoRequestContext;
 class Expr;
 class LlvmCodeGen;
+class MemTracker;
+class ObjectPool;
+class RuntimeFilterBank;
+class Status;
 class TimestampValue;
-class DataStreamRecvr;
+class TQueryOptions;
+class TUniqueId;
 
 /// Counts how many rows an INSERT query has added to a particular partition
 /// (partitions are identified by their partition keys: k1=v1/k2=v2
@@ -140,7 +134,7 @@ class RuntimeState {
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-  std::vector<DiskIoMgr::RequestContext*>* reader_contexts() { return &reader_contexts_; }
+  std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_; }
 
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -151,7 +145,7 @@ class RuntimeState {
   /// See comment on root_node_id_. We add one to prevent having a hash seed of 0.
   uint32_t fragment_hash_seed() const { return root_node_id_ + 1; }
 
-  RuntimeFilterBank* filter_bank() { return &filter_bank_; }
+  RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
 
   PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
 
@@ -348,7 +342,7 @@ class RuntimeState {
   QueryResourceMgr* query_resource_mgr_;
 
   /// Reader contexts that need to be closed when the fragment is closed.
-  std::vector<DiskIoMgr::RequestContext*> reader_contexts_;
+  std::vector<DiskIoRequestContext*> reader_contexts_;
 
   /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory
   /// with a fixed memory budget.
@@ -366,7 +360,7 @@ class RuntimeState {
 
   /// Manages runtime filters that are either produced or consumed (or both!) by plan
   /// nodes that share this runtime state.
-  RuntimeFilterBank filter_bank_;
+  boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
 
   /// prohibit copies
   RuntimeState(const RuntimeState&);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 0376437..328d1c2 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -18,7 +18,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/sorter.h"
 #include "runtime/tuple-row.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e0d388a..299dd2f 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -22,7 +22,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index c8f79df..ec59806 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -18,6 +18,7 @@
 
 #include "common/status.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 
 using namespace strings;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index cb2c13e..00aaeba 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -18,10 +18,7 @@
 #include <stdlib.h>
 
 #include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
 
 #include <list>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index fe85948..93e63f3 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -29,6 +29,7 @@
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 70174a1..94e21b8 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -25,7 +25,7 @@
 #include "runtime/mem-tracker.h"
 #include "util/debug-util.h"
 #include "util/time.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/pretty-printer.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
index 5687269..bf42329 100644
--- a/be/src/scheduling/query-resource-mgr.cc
+++ b/be/src/scheduling/query-resource-mgr.cc
@@ -22,6 +22,7 @@
 #include "runtime/exec-env.h"
 #include "resourcebroker/resource-broker.h"
 #include "util/bit-util.h"
+#include "util/cgroups-mgr.h"
 #include "util/container-util.h"
 #include "util/network-util.h"
 #include "util/promise.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 7abc916..a14509c 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -24,6 +24,7 @@
 
 #include "common/logging.h"
 #include "util/metrics.h"
+#include "resourcebroker/resource-broker.h"
 #include "runtime/exec-env.h"
 #include "runtime/coordinator.h"
 #include "service/impala-server.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 680e295..38866f1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -20,6 +20,7 @@
 #include "gen-cpp/ImpalaInternalService.h"
 #include "rpc/thrift-util.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/runtime-filter-bank.h"
 #include "util/bloom-filter.h"
 #include "runtime/backend-client.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index 5210f8c..f64ca7e 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -20,6 +20,7 @@
 
 #include "service/fragment-exec-state.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "util/impalad-metrics.h"
 #include "util/uid-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 478c1ea..35cb945 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -41,6 +41,7 @@
 #include "runtime/plan-fragment-executor.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "scheduling/simple-scheduler.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 1dc4982..ce5df38 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -31,11 +31,12 @@
 #include "common/logging.h"
 #include "common/version.h"
 #include "exprs/expr.h"
+#include "rpc/thrift-util.h"
 #include "runtime/raw-value.h"
 #include "service/query-exec-state.h"
 #include "service/query-options.h"
 #include "util/debug-util.h"
-#include "rpc/thrift-util.h"
+#include "util/runtime-profile-counters.h"
 #include "util/impalad-metrics.h"
 #include "util/string-parser.h"
 #include "service/hs2-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index b534902..ec70c47 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -18,6 +18,8 @@
 
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "resourcebroker/resource-broker.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "service/impala-server.h"
@@ -25,6 +27,7 @@
 #include "service/query-options.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
 #include "gen-cpp/CatalogService.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 3808459..ddf663e 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -21,23 +21,22 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
 
 #include "statestore/statestore.h"
 #include "util/stopwatch.h"
 #include "rpc/thrift-util.h"
 #include "rpc/thrift-client.h"
-#include "util/thread.h"
 #include "util/metrics.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
 
 namespace impala {
 
-class TimeoutFailureDetector;
 class Status;
-class TNetworkAddress;
+class TimeoutFailureDetector;
+class Thread;
 class ThriftServer;
+class TNetworkAddress;
 
 typedef ClientCache<StatestoreServiceClient> StatestoreClientCache;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 358ae54..3b3994a 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -15,7 +15,7 @@
 #include "testutil/desc-tbl-builder.h"
 #include "util/bit-util.h"
 
-
+#include "common/object-pool.h"
 #include "runtime/descriptors.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.h b/be/src/testutil/desc-tbl-builder.h
index ec816ee..fdaf67e 100644
--- a/be/src/testutil/desc-tbl-builder.h
+++ b/be/src/testutil/desc-tbl-builder.h
@@ -16,6 +16,7 @@
 #define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_
 
 #include "runtime/runtime-state.h"
+#include "runtime/types.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc
index e75a250..afe037c 100644
--- a/be/src/util/auth-util.cc
+++ b/be/src/util/auth-util.cc
@@ -14,6 +14,8 @@
 
 #include "util/auth-util.h"
 
+#include "gen-cpp/ImpalaInternalService_types.h"
+
 using namespace std;
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h
index e44bf70..d82c61a 100644
--- a/be/src/util/auth-util.h
+++ b/be/src/util/auth-util.h
@@ -18,16 +18,16 @@
 
 #include <string>
 
-#include "gen-cpp/ImpalaInternalService_types.h"
-
 namespace impala {
 
-  /// Returns a reference to the "effective user" from the specified session. Queries
-  /// are run and authorized on behalf of the effective user. When a delegated_user is
-  /// specified (is not empty), the effective user is the delegated_user. This is because
-  /// the connected_user is acting as a "proxy user" for the delegated_user. When
-  /// delegated_user is empty, the effective user is the connected user.
-  const std::string& GetEffectiveUser(const TSessionState& session);
+class TSessionState;
+
+/// Returns a reference to the "effective user" from the specified session. Queries
+/// are run and authorized on behalf of the effective user. When a delegated_user is
+/// specified (is not empty), the effective user is the delegated_user. This is because
+/// the connected_user is acting as a "proxy user" for the delegated_user. When
+/// delegated_user is empty, the effective user is the connected user.
+const std::string& GetEffectiveUser(const TSessionState& session);
 
 } // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/avro-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/avro-util.cc b/be/src/util/avro-util.cc
index b591f23..5cce8a3 100644
--- a/be/src/util/avro-util.cc
+++ b/be/src/util/avro-util.cc
@@ -12,10 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <avro/schema.h>
-
 #include "util/avro-util.h"
 
+#include <avro/schema.h>
+#include <sstream>
+
 using namespace std;
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index afe3791..a337983 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -17,8 +17,6 @@
 #define IMPALA_UTIL_CODEC_H
 
 #include "common/status.h"
-#include "runtime/mem-pool.h"
-#include "util/runtime-profile.h"
 
 #include <boost/scoped_ptr.hpp>
 #include "gen-cpp/Descriptors_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 588aff6..8a2ea74 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -15,6 +15,7 @@
 #include <boost/assign/list_of.hpp>
 #include "util/decompress.h"
 #include "exec/read-write-util.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "common/logging.h"
 #include "gen-cpp/Descriptors_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 7a48886..795a6b1 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -24,7 +24,6 @@
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
 #include "util/rle-encoding.h"
-#include "util/runtime-profile.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops-defs.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops-defs.h b/be/src/util/hdfs-bulk-ops-defs.h
new file mode 100644
index 0000000..13e1be0
--- /dev/null
+++ b/be/src/util/hdfs-bulk-ops-defs.h
@@ -0,0 +1,31 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+#define IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+
+namespace impala {
+
+/// Forward declarations for HDFS ops.
+template <typename T>
+class ThreadPool;
+
+class HdfsOp;
+class HdfsOperationSet;
+
+typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index e66f0d1..97feaa0 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -21,6 +21,7 @@
 #include "common/hdfs.h"
 #include "common/atomic.h"
 #include "common/status.h"
+#include "util/hdfs-bulk-ops-defs.h"
 #include "util/thread-pool.h"
 #include "util/counting-barrier.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -36,8 +37,6 @@ enum HdfsOpType {
   CHMOD
 };
 
-class HdfsOperationSet;
-
 /// Container class that encapsulates a single HDFS operation. Used only internally by
 /// HdfsOperationSet, but visible because it parameterises HdfsOpThreadPool.
 class HdfsOp {
@@ -81,8 +80,6 @@ class HdfsOp {
   void AddError(const string& error_msg) const;
 };
 
-typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
-
 /// Creates a new HdfsOp-processing thread pool.
 HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& name, uint32_t num_threads,
     uint32_t max_queue_length);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index ab4b6c2..9ddaa48 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -24,8 +24,8 @@
 #include <boost/thread/locks.hpp>
 
 #include "common/logging.h"
-#include "common/status.h"
 #include "common/object-pool.h"
+#include "common/status.h"
 #include "util/debug-util.h"
 #include "util/json-util.h"
 #include "util/pretty-printer.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index a3979d8..d1cbac5 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -14,6 +14,7 @@
 
 #include "util/periodic-counter-updater.h"
 
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
new file mode 100644
index 0000000..4b51c4f
--- /dev/null
+++ b/be/src/util/runtime-profile-counters.h
@@ -0,0 +1,488 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+#define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/unordered_map.hpp>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include "common/atomic.h"
+#include "common/logging.h"
+#include "util/runtime-profile.h"
+#include "util/stopwatch.h"
+#include "util/streaming-sampler.h"
+
+namespace impala {
+
+/// Define macros for updating counters.  The macros make it very easy to disable
+/// all counters at compile time.  Set this to 0 to remove counters.  This is useful
+/// to do to make sure the counters aren't affecting the system.
+#define ENABLE_COUNTERS 1
+
+/// Some macro magic to generate unique ids using __COUNTER__
+#define CONCAT_IMPL(x, y) x##y
+#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
+
+#if ENABLE_COUNTERS
+  #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
+  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
+      (profile)->AddTimeSeriesCounter(name, src_counter)
+  #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
+  #define ADD_CHILD_TIMER(profile, name, parent) \
+      (profile)->AddCounter(name, TUnit::TIME_NS, parent)
+  #define SCOPED_TIMER(c) \
+      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
+  #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
+      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
+  #define COUNTER_ADD(c, v) (c)->Add(v)
+  #define COUNTER_SET(c, v) (c)->Set(v)
+  #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
+  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
+    ThreadCounterMeasurement \
+      MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
+  #define SCOPED_CONCURRENT_COUNTER(c) \
+    ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
+      MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
+#else
+  #define ADD_COUNTER(profile, name, unit) NULL
+  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
+  #define ADD_TIMER(profile, name) NULL
+  #define ADD_CHILD_TIMER(profile, name, parent) NULL
+  #define SCOPED_TIMER(c)
+  #define CANCEL_SAFE_SCOPED_TIMER(c)
+  #define COUNTER_ADD(c, v)
+  #define COUNTER_SET(c, v)
+  #define ADD_THREAD_COUNTERS(profile, prefix) NULL
+  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
+  #define SCOPED_CONCURRENT_COUNTER(c)
+#endif
+
+/// A counter that keeps track of the highest value seen (reporting that
+/// as value()) and the current value.
+class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
+ public:
+  HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
+
+  virtual void Add(int64_t delta) {
+    int64_t new_val = current_value_.Add(delta);
+    UpdateMax(new_val);
+  }
+
+  /// Tries to increase the current value by delta. If current_value() + delta
+  /// exceeds max, return false and current_value is not changed.
+  bool TryAdd(int64_t delta, int64_t max) {
+    while (true) {
+      int64_t old_val = current_value_.Load();
+      int64_t new_val = old_val + delta;
+      if (UNLIKELY(new_val > max)) return false;
+      if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
+        UpdateMax(new_val);
+        return true;
+      }
+    }
+  }
+
+  virtual void Set(int64_t v) {
+    current_value_.Store(v);
+    UpdateMax(v);
+  }
+
+  int64_t current_value() const { return current_value_.Load(); }
+
+ private:
+  /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
+  /// atomic.
+  void UpdateMax(int64_t v) {
+    while (true) {
+      int64_t old_max = value_.Load();
+      int64_t new_max = std::max(old_max, v);
+      if (new_max == old_max) break; // Avoid atomic update.
+      if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
+    }
+  }
+
+  /// The current value of the counter. value_ in the super class represents
+  /// the high water mark.
+  AtomicInt64 current_value_;
+};
+
+/// A DerivedCounter also has a name and unit, but the value is computed.
+/// Do not call Set() and Add().
+class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
+ public:
+  DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
+    : Counter(unit),
+      counter_fn_(counter_fn) {}
+
+  virtual int64_t value() const {
+    return counter_fn_();
+  }
+
+ private:
+  DerivedCounterFunction counter_fn_;
+};
+
+/// An AveragedCounter maintains a set of counters and its value is the
+/// average of the values in that set. The average is updated through calls
+/// to UpdateCounter(), which may add a new counter or update an existing counter.
+/// Set() and Add() should not be called.
+class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
+ public:
+  AveragedCounter(TUnit::type unit)
+   : Counter(unit),
+     current_double_sum_(0.0),
+     current_int_sum_(0) {
+  }
+
+  /// Update counter_value_map_ with the new counter. This may require the counter
+  /// to be added to the map.
+  /// No locks are obtained within this class because UpdateCounter() is called from
+  /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
+  void UpdateCounter(Counter* new_counter) {
+    DCHECK_EQ(new_counter->unit_, unit_);
+    boost::unordered_map<Counter*, int64_t>::iterator it =
+        counter_value_map_.find(new_counter);
+    int64_t old_val = 0;
+    if (it != counter_value_map_.end()) {
+      old_val = it->second;
+      it->second = new_counter->value();
+    } else {
+      counter_value_map_[new_counter] = new_counter->value();
+    }
+
+    if (unit_ == TUnit::DOUBLE_VALUE) {
+      double old_double_val = *reinterpret_cast<double*>(&old_val);
+      current_double_sum_ += (new_counter->double_value() - old_double_val);
+      double result_val = current_double_sum_ / (double) counter_value_map_.size();
+      value_.Store(*reinterpret_cast<int64_t*>(&result_val));
+    } else {
+      current_int_sum_ += (new_counter->value() - old_val);
+      value_.Store(current_int_sum_ / counter_value_map_.size());
+    }
+  }
+
+  /// The value for this counter should be updated through UpdateCounter().
+  /// Set() and Add() should not be used.
+  virtual void Set(double value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int64_t value) {
+    DCHECK(false);
+  }
+
+  virtual void Add(int64_t delta) {
+    DCHECK(false);
+  }
+
+ private:
+  /// Map from counters to their existing values. Modified via UpdateCounter().
+  boost::unordered_map<Counter*, int64_t> counter_value_map_;
+
+  /// Current sums of values from counter_value_map_. Only one of these is used,
+  /// depending on the unit of the counter. current_double_sum_ is used for
+  /// DOUBLE_VALUE, current_int_sum_ otherwise.
+  double current_double_sum_;
+  int64_t current_int_sum_;
+};
+
+/// A set of counters that measure thread info, such as total time, user time, sys time.
+class RuntimeProfile::ThreadCounters {
+ private:
+  friend class ThreadCounterMeasurement;
+  friend class RuntimeProfile;
+
+  Counter* total_time_; // total wall clock time
+  Counter* user_time_;  // user CPU time
+  Counter* sys_time_;   // system CPU time
+
+  /// The number of times a context switch resulted due to a process voluntarily giving
+  /// up the processor before its time slice was completed.
+  Counter* voluntary_context_switches_;
+
+  /// The number of times a context switch resulted due to a higher priority process
+  /// becoming runnable or because the current process exceeded its time slice.
+  Counter* involuntary_context_switches_;
+};
+
+/// An EventSequence captures a sequence of events (each added by
+/// calling MarkEvent). Each event has a text label, and a time
+/// (measured relative to the moment Start() was called as t=0). It is
+/// useful for tracking the evolution of some serial process, such as
+/// the query lifecycle.
+class RuntimeProfile::EventSequence {
+ public:
+  EventSequence() { }
+
+  /// Helper constructor for building from Thrift
+  EventSequence(const std::vector<int64_t>& timestamps,
+                const std::vector<std::string>& labels) {
+    DCHECK(timestamps.size() == labels.size());
+    for (int i = 0; i < timestamps.size(); ++i) {
+      events_.push_back(make_pair(labels[i], timestamps[i]));
+    }
+  }
+
+  /// Starts the timer without resetting it.
+  void Start() { sw_.Start(); }
+
+  /// Stores an event in sequence with the given label and the current time
+  /// (relative to the first time Start() was called) as the timestamp.
+  void MarkEvent(const std::string& label) {
+    Event event = make_pair(label, sw_.ElapsedTime());
+    boost::lock_guard<SpinLock> event_lock(lock_);
+    events_.push_back(event);
+  }
+
+  int64_t ElapsedTime() { return sw_.ElapsedTime(); }
+
+  /// An Event is a <label, timestamp> pair.
+  typedef std::pair<std::string, int64_t> Event;
+
+  /// An EventList is a sequence of Events, in increasing timestamp order.
+  typedef std::vector<Event> EventList;
+
+  /// Copies the member events_ into the supplied vector 'events'.
+  /// The supplied vector 'events' is cleared before this.
+  void GetEvents(std::vector<Event>* events) {
+    events->clear();
+    boost::lock_guard<SpinLock> event_lock(lock_);
+    events->insert(events->end(), events_.begin(), events_.end());
+  }
+
+  void ToThrift(TEventSequence* seq) const;
+
+ private:
+  /// Protect access to events_.
+  SpinLock lock_;
+
+  /// Stored in increasing time order.
+  EventList events_;
+
+  /// Timer which allows events to be timestamped when they are recorded.
+  MonotonicStopWatch sw_;
+};
+
+typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+class RuntimeProfile::TimeSeriesCounter {
+ public:
+  std::string DebugString() const;
+
+  void AddSample(int ms_elapsed) {
+    int64_t sample = sample_fn_();
+    samples_.AddSample(sample, ms_elapsed);
+  }
+
+ private:
+  friend class RuntimeProfile;
+
+  TimeSeriesCounter(const std::string& name, TUnit::type unit,
+      DerivedCounterFunction fn)
+    : name_(name), unit_(unit), sample_fn_(fn) {
+  }
+
+  /// Construct a time series object from existing sample data. This counter
+  /// is then read-only (i.e. there is no sample function).
+  TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
+      const std::vector<int64_t>& values)
+    : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
+  }
+
+  void ToThrift(TTimeSeriesCounter* counter);
+
+  std::string name_;
+  TUnit::type unit_;
+  DerivedCounterFunction sample_fn_;
+  StreamingCounterSampler samples_;
+};
+
+/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
+/// concurrent running time.
+class RuntimeProfile::ConcurrentTimerCounter : public Counter {
+ public:
+  ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
+
+  virtual int64_t value() const { return csw_.TotalRunningTime(); }
+
+  void Start() { csw_.Start(); }
+
+  void Stop() { csw_.Stop(); }
+
+  /// Returns lap time for caller who wants delta update of concurrent running time.
+  uint64_t LapTime() { return csw_.LapTime(); }
+
+  /// The value for this counter should come from internal ConcurrentStopWatch.
+  /// Set() and Add() should not be used.
+  virtual void Set(double value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int64_t value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int value) {
+    DCHECK(false);
+  }
+
+  virtual void Add(int64_t delta) {
+    DCHECK(false);
+  }
+
+ private:
+  ConcurrentStopWatch csw_;
+};
+
+/// Utility class to mark an event when the object is destroyed.
+class ScopedEvent {
+ public:
+  ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
+    : label_(label),
+      event_sequence_(event_sequence) {
+  }
+
+  /// Mark the event when the object is destroyed
+  ~ScopedEvent() {
+    event_sequence_->MarkEvent(label_);
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ScopedEvent(const ScopedEvent& event);
+  ScopedEvent& operator=(const ScopedEvent& event);
+
+  const std::string label_;
+  RuntimeProfile::EventSequence* event_sequence_;
+};
+
+/// Utility class to update time elapsed when the object goes out of scope.
+/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
+/// we use templates not to pay for virtual function overhead. In some cases
+/// the runtime profile may be deleted while the counter is still active. In this
+/// case the is_cancelled argument can be provided so that ScopedTimer will not
+/// update the counter when the query is cancelled. The destructor for ScopedTimer
+/// can access both is_cancelled and the counter, so the caller must ensure that it
+/// is safe to access both at the end of the scope in which the timer is used.
+template<class T>
+class ScopedTimer {
+ public:
+  ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
+    counter_(counter), is_cancelled_(is_cancelled){
+    if (counter == NULL) return;
+    DCHECK(counter->unit() == TUnit::TIME_NS);
+    sw_.Start();
+  }
+
+  void Stop() { sw_.Stop(); }
+  void Start() { sw_.Start(); }
+
+  void UpdateCounter() {
+    if (counter_ != NULL && !IsCancelled()) {
+      counter_->Add(sw_.ElapsedTime());
+    }
+  }
+
+  /// Updates the underlying counter for the final time and clears the pointer to it.
+  void ReleaseCounter() {
+    UpdateCounter();
+    counter_ = NULL;
+  }
+
+  bool IsCancelled() {
+    return is_cancelled_ != NULL && *is_cancelled_;
+  }
+
+  /// Update counter when object is destroyed
+  ~ScopedTimer() {
+    sw_.Stop();
+    UpdateCounter();
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ScopedTimer(const ScopedTimer& timer);
+  ScopedTimer& operator=(const ScopedTimer& timer);
+
+  T sw_;
+  RuntimeProfile::Counter* counter_;
+  const bool* is_cancelled_;
+};
+
+
+#ifdef __APPLE__
+// On OS X rusage via thread is not supported. In addition, the majority of the fields of
+// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
+// initially it will most likely be enough to capture only time.
+// C.f. http://blog.kuriositaet.de/?p=257
+#define RUSAGE_THREAD RUSAGE_SELF
+#endif
+
+/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
+/// called. Threads measurements will then be taken using getrusage.
+/// This is ~5x slower than ScopedTimer due to calling getrusage.
+class ThreadCounterMeasurement {
+ public:
+  ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
+    stop_(false), counters_(counters) {
+    DCHECK(counters != NULL);
+    sw_.Start();
+    int ret = getrusage(RUSAGE_THREAD, &usage_base_);
+    DCHECK_EQ(ret, 0);
+  }
+
+  /// Stop and update the counter
+  void Stop() {
+    if (stop_) return;
+    stop_ = true;
+    sw_.Stop();
+    rusage usage;
+    int ret = getrusage(RUSAGE_THREAD, &usage);
+    DCHECK_EQ(ret, 0);
+    int64_t utime_diff =
+        (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
+        (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
+    int64_t stime_diff =
+        (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
+        (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
+    counters_->total_time_->Add(sw_.ElapsedTime());
+    counters_->user_time_->Add(utime_diff);
+    counters_->sys_time_->Add(stime_diff);
+    counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
+    counters_->involuntary_context_switches_->Add(
+        usage.ru_nivcsw - usage_base_.ru_nivcsw);
+  }
+
+  /// Update counter when object is destroyed
+  ~ThreadCounterMeasurement() {
+    Stop();
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
+  ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
+
+  bool stop_;
+  rusage usage_base_;
+  MonotonicStopWatch sw_;
+  RuntimeProfile::ThreadCounters* counters_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index d98b54c..2c80082 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -20,7 +20,7 @@
 #include "common/object-pool.h"
 #include "util/cpu-info.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/streaming-sampler.h"
 #include "util/thread.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index bc7447a..714180a 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include <iomanip>
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 8030d7b..98a02ed 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -17,64 +17,16 @@
 #define IMPALA_UTIL_RUNTIME_PROFILE_H
 
 #include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
+#include <boost/thread/lock_guard.hpp>
 #include <iostream>
-#include <sys/time.h>
-#include <sys/resource.h>
 
 #include "common/atomic.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "util/stopwatch.h"
-#include "util/streaming-sampler.h"
+#include "util/spinlock.h"
+
 #include "gen-cpp/RuntimeProfile_types.h"
 
 namespace impala {
 
-/// Define macros for updating counters.  The macros make it very easy to disable
-/// all counters at compile time.  Set this to 0 to remove counters.  This is useful
-/// to do to make sure the counters aren't affecting the system.
-#define ENABLE_COUNTERS 1
-
-/// Some macro magic to generate unique ids using __COUNTER__
-#define CONCAT_IMPL(x, y) x##y
-#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
-
-#if ENABLE_COUNTERS
-  #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
-  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
-      (profile)->AddTimeSeriesCounter(name, src_counter)
-  #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
-  #define ADD_CHILD_TIMER(profile, name, parent) \
-      (profile)->AddCounter(name, TUnit::TIME_NS, parent)
-  #define SCOPED_TIMER(c) \
-      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
-  #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
-      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
-  #define COUNTER_ADD(c, v) (c)->Add(v)
-  #define COUNTER_SET(c, v) (c)->Set(v)
-  #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
-  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
-    ThreadCounterMeasurement \
-      MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
-  #define SCOPED_CONCURRENT_COUNTER(c) \
-    ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
-      MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
-#else
-  #define ADD_COUNTER(profile, name, unit) NULL
-  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
-  #define ADD_TIMER(profile, name) NULL
-  #define ADD_CHILD_TIMER(profile, name, parent) NULL
-  #define SCOPED_TIMER(c)
-  #define CANCEL_SAFE_SCOPED_TIMER(c)
-  #define COUNTER_ADD(c, v)
-  #define COUNTER_SET(c, v)
-  #define ADD_THREAD_COUNTERS(profile, prefix) NULL
-  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
-  #define SCOPED_CONCURRENT_COUNTER(c)
-#endif
-
 class ObjectPool;
 
 /// Runtime profile is a group of profiling counters.  It supports adding named counters
@@ -133,285 +85,16 @@ class RuntimeProfile {
     TUnit::type unit_;
   };
 
-  /// A counter that keeps track of the highest value seen (reporting that
-  /// as value()) and the current value.
-  class HighWaterMarkCounter : public Counter {
-   public:
-    HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
-
-    virtual void Add(int64_t delta) {
-      int64_t new_val = current_value_.Add(delta);
-      UpdateMax(new_val);
-    }
-
-    /// Tries to increase the current value by delta. If current_value() + delta
-    /// exceeds max, return false and current_value is not changed.
-    bool TryAdd(int64_t delta, int64_t max) {
-      while (true) {
-        int64_t old_val = current_value_.Load();
-        int64_t new_val = old_val + delta;
-        if (UNLIKELY(new_val > max)) return false;
-        if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
-          UpdateMax(new_val);
-          return true;
-        }
-      }
-    }
-
-    virtual void Set(int64_t v) {
-      current_value_.Store(v);
-      UpdateMax(v);
-    }
-
-    int64_t current_value() const { return current_value_.Load(); }
-
-   private:
-    /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
-    /// atomic.
-    void UpdateMax(int64_t v) {
-      while (true) {
-        int64_t old_max = value_.Load();
-        int64_t new_max = std::max(old_max, v);
-        if (new_max == old_max) break; // Avoid atomic update.
-        if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
-      }
-    }
-
-    /// The current value of the counter. value_ in the super class represents
-    /// the high water mark.
-    AtomicInt64 current_value_;
-  };
+  class AveragedCounter;
+  class ConcurrentTimerCounter;
+  class DerivedCounter;
+  class EventSequence;
+  class HighWaterMarkCounter;
+  class ThreadCounters;
+  class TimeSeriesCounter;
 
   typedef boost::function<int64_t ()> DerivedCounterFunction;
 
-  /// A DerivedCounter also has a name and unit, but the value is computed.
-  /// Do not call Set() and Add().
-  class DerivedCounter : public Counter {
-   public:
-    DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
-      : Counter(unit),
-        counter_fn_(counter_fn) {}
-
-    virtual int64_t value() const {
-      return counter_fn_();
-    }
-
-   private:
-    DerivedCounterFunction counter_fn_;
-  };
-
-  /// An AveragedCounter maintains a set of counters and its value is the
-  /// average of the values in that set. The average is updated through calls
-  /// to UpdateCounter(), which may add a new counter or update an existing counter.
-  /// Set() and Add() should not be called.
-  class AveragedCounter : public Counter {
-   public:
-    AveragedCounter(TUnit::type unit)
-     : Counter(unit),
-       current_double_sum_(0.0),
-       current_int_sum_(0) {
-    }
-
-    /// Update counter_value_map_ with the new counter. This may require the counter
-    /// to be added to the map.
-    /// No locks are obtained within this class because UpdateCounter() is called from
-    /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
-    void UpdateCounter(Counter* new_counter) {
-      DCHECK_EQ(new_counter->unit_, unit_);
-      boost::unordered_map<Counter*, int64_t>::iterator it =
-          counter_value_map_.find(new_counter);
-      int64_t old_val = 0;
-      if (it != counter_value_map_.end()) {
-        old_val = it->second;
-        it->second = new_counter->value();
-      } else {
-        counter_value_map_[new_counter] = new_counter->value();
-      }
-
-      if (unit_ == TUnit::DOUBLE_VALUE) {
-        double old_double_val = *reinterpret_cast<double*>(&old_val);
-        current_double_sum_ += (new_counter->double_value() - old_double_val);
-        double result_val = current_double_sum_ / (double) counter_value_map_.size();
-        value_.Store(*reinterpret_cast<int64_t*>(&result_val));
-      } else {
-        current_int_sum_ += (new_counter->value() - old_val);
-        value_.Store(current_int_sum_ / counter_value_map_.size());
-      }
-    }
-
-    /// The value for this counter should be updated through UpdateCounter().
-    /// Set() and Add() should not be used.
-    virtual void Set(double value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int64_t value) {
-      DCHECK(false);
-    }
-
-    virtual void Add(int64_t delta) {
-      DCHECK(false);
-    }
-
-   private:
-    /// Map from counters to their existing values. Modified via UpdateCounter().
-    boost::unordered_map<Counter*, int64_t> counter_value_map_;
-
-    /// Current sums of values from counter_value_map_. Only one of these is used,
-    /// depending on the unit of the counter. current_double_sum_ is used for
-    /// DOUBLE_VALUE, current_int_sum_ otherwise.
-    double current_double_sum_;
-    int64_t current_int_sum_;
-  };
-
-  /// A set of counters that measure thread info, such as total time, user time, sys time.
-  class ThreadCounters {
-   private:
-    friend class ThreadCounterMeasurement;
-    friend class RuntimeProfile;
-
-    Counter* total_time_; // total wall clock time
-    Counter* user_time_;  // user CPU time
-    Counter* sys_time_;   // system CPU time
-
-    /// The number of times a context switch resulted due to a process voluntarily giving
-    /// up the processor before its time slice was completed.
-    Counter* voluntary_context_switches_;
-
-    /// The number of times a context switch resulted due to a higher priority process
-    /// becoming runnable or because the current process exceeded its time slice.
-    Counter* involuntary_context_switches_;
-  };
-
-  /// An EventSequence captures a sequence of events (each added by
-  /// calling MarkEvent). Each event has a text label, and a time
-  /// (measured relative to the moment Start() was called as t=0). It is
-  /// useful for tracking the evolution of some serial process, such as
-  /// the query lifecycle.
-  class EventSequence {
-   public:
-    EventSequence() { }
-
-    /// Helper constructor for building from Thrift
-    EventSequence(const std::vector<int64_t>& timestamps,
-                  const std::vector<std::string>& labels) {
-      DCHECK(timestamps.size() == labels.size());
-      for (int i = 0; i < timestamps.size(); ++i) {
-        events_.push_back(make_pair(labels[i], timestamps[i]));
-      }
-    }
-
-    /// Starts the timer without resetting it.
-    void Start() { sw_.Start(); }
-
-    /// Stores an event in sequence with the given label and the current time
-    /// (relative to the first time Start() was called) as the timestamp.
-    void MarkEvent(const std::string& label) {
-      Event event = make_pair(label, sw_.ElapsedTime());
-      boost::lock_guard<SpinLock> event_lock(lock_);
-      events_.push_back(event);
-    }
-
-    int64_t ElapsedTime() { return sw_.ElapsedTime(); }
-
-    /// An Event is a <label, timestamp> pair.
-    typedef std::pair<std::string, int64_t> Event;
-
-    /// An EventList is a sequence of Events, in increasing timestamp order.
-    typedef std::vector<Event> EventList;
-
-    /// Copies the member events_ into the supplied vector 'events'.
-    /// The supplied vector 'events' is cleared before this.
-    void GetEvents(std::vector<Event>* events) {
-      events->clear();
-      boost::lock_guard<SpinLock> event_lock(lock_);
-      events->insert(events->end(), events_.begin(), events_.end());
-    }
-
-    void ToThrift(TEventSequence* seq) const;
-
-   private:
-    /// Protect access to events_.
-    SpinLock lock_;
-
-    /// Stored in increasing time order.
-    EventList events_;
-
-    /// Timer which allows events to be timestamped when they are recorded.
-    MonotonicStopWatch sw_;
-  };
-
-  typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
-  class TimeSeriesCounter {
-   public:
-    std::string DebugString() const;
-
-    void AddSample(int ms_elapsed) {
-      int64_t sample = sample_fn_();
-      samples_.AddSample(sample, ms_elapsed);
-    }
-
-   private:
-    friend class RuntimeProfile;
-
-    TimeSeriesCounter(const std::string& name, TUnit::type unit,
-        DerivedCounterFunction fn)
-      : name_(name), unit_(unit), sample_fn_(fn) {
-    }
-
-    /// Construct a time series object from existing sample data. This counter
-    /// is then read-only (i.e. there is no sample function).
-    TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
-        const std::vector<int64_t>& values)
-      : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
-    }
-
-    void ToThrift(TTimeSeriesCounter* counter);
-
-    std::string name_;
-    TUnit::type unit_;
-    DerivedCounterFunction sample_fn_;
-    StreamingCounterSampler samples_;
-  };
-
-  /// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
-  /// concurrent running time.
-  class ConcurrentTimerCounter : public Counter {
-   public:
-    ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
-
-    virtual int64_t value() const { return csw_.TotalRunningTime(); }
-
-    void Start() { csw_.Start(); }
-
-    void Stop() { csw_.Stop(); }
-
-    /// Returns lap time for caller who wants delta update of concurrent running time.
-    uint64_t LapTime() { return csw_.LapTime(); }
-
-    /// The value for this counter should come from internal ConcurrentStopWatch.
-    /// Set() and Add() should not be used.
-    virtual void Set(double value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int64_t value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int value) {
-      DCHECK(false);
-    }
-
-    virtual void Add(int64_t delta) {
-      DCHECK(false);
-    }
-
-   private:
-    ConcurrentStopWatch csw_;
-  };
-
-
   /// Create a runtime profile object with 'name'.  Counters and merged profile are
   /// allocated from pool.
   /// If is_averaged_profile is true, the counters in this profile will be derived
@@ -721,166 +404,6 @@ class RuntimeProfile {
       const ChildCounterMap& child_counter_map, std::ostream* s);
 };
 
-/// Utility class to mark an event when the object is destroyed.
-class ScopedEvent {
- public:
-  ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
-    : label_(label),
-      event_sequence_(event_sequence) {
-  }
-
-  /// Mark the event when the object is destroyed
-  ~ScopedEvent() {
-    event_sequence_->MarkEvent(label_);
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedEvent(const ScopedEvent& event);
-  ScopedEvent& operator=(const ScopedEvent& event);
-
-  const std::string label_;
-  RuntimeProfile::EventSequence* event_sequence_;
-};
-
-/// Utility class to update the counter at object construction and destruction.
-/// When the object is constructed, decrement the counter by val.
-/// When the object goes out of scope, increment the counter by val.
-class ScopedCounter {
- public:
-  ScopedCounter(RuntimeProfile::Counter* counter, int64_t val) :
-    val_(val),
-    counter_(counter) {
-    if (counter == NULL) return;
-    counter_->Add(-1L * val_);
-  }
-
-  /// Increment the counter when object is destroyed
-  ~ScopedCounter() {
-    if (counter_ != NULL) counter_->Add(val_);
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedCounter(const ScopedCounter& counter);
-  ScopedCounter& operator=(const ScopedCounter& counter);
-
-  int64_t val_;
-  RuntimeProfile::Counter* counter_;
-};
-
-/// Utility class to update time elapsed when the object goes out of scope.
-/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
-/// we use templates not to pay for virtual function overhead. In some cases
-/// the runtime profile may be deleted while the counter is still active. In this
-/// case the is_cancelled argument can be provided so that ScopedTimer will not
-/// update the counter when the query is cancelled. The destructor for ScopedTimer
-/// can access both is_cancelled and the counter, so the caller must ensure that it
-/// is safe to access both at the end of the scope in which the timer is used.
-template<class T>
-class ScopedTimer {
- public:
-  ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
-    counter_(counter), is_cancelled_(is_cancelled){
-    if (counter == NULL) return;
-    DCHECK(counter->unit() == TUnit::TIME_NS);
-    sw_.Start();
-  }
-
-  void Stop() { sw_.Stop(); }
-  void Start() { sw_.Start(); }
-
-  void UpdateCounter() {
-    if (counter_ != NULL && !IsCancelled()) {
-      counter_->Add(sw_.ElapsedTime());
-    }
-  }
-
-  /// Updates the underlying counter for the final time and clears the pointer to it.
-  void ReleaseCounter() {
-    UpdateCounter();
-    counter_ = NULL;
-  }
-
-  bool IsCancelled() {
-    return is_cancelled_ != NULL && *is_cancelled_;
-  }
-
-  /// Update counter when object is destroyed
-  ~ScopedTimer() {
-    sw_.Stop();
-    UpdateCounter();
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedTimer(const ScopedTimer& timer);
-  ScopedTimer& operator=(const ScopedTimer& timer);
-
-  T sw_;
-  RuntimeProfile::Counter* counter_;
-  const bool* is_cancelled_;
-};
-
-#ifdef __APPLE__
-// On OS X rusage via thread is not supported. In addition, the majority of the fields of
-// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
-// initially it will most likely be enough to capture only time.
-// C.f. http://blog.kuriositaet.de/?p=257
-#define RUSAGE_THREAD RUSAGE_SELF
-#endif
-
-/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
-/// called. Threads measurements will then be taken using getrusage.
-/// This is ~5x slower than ScopedTimer due to calling getrusage.
-class ThreadCounterMeasurement {
- public:
-  ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
-    stop_(false), counters_(counters) {
-    DCHECK(counters != NULL);
-    sw_.Start();
-    int ret = getrusage(RUSAGE_THREAD, &usage_base_);
-    DCHECK_EQ(ret, 0);
-  }
-
-  /// Stop and update the counter
-  void Stop() {
-    if (stop_) return;
-    stop_ = true;
-    sw_.Stop();
-    rusage usage;
-    int ret = getrusage(RUSAGE_THREAD, &usage);
-    DCHECK_EQ(ret, 0);
-    int64_t utime_diff =
-        (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
-        (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
-    int64_t stime_diff =
-        (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
-        (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
-    counters_->total_time_->Add(sw_.ElapsedTime());
-    counters_->user_time_->Add(utime_diff);
-    counters_->sys_time_->Add(stime_diff);
-    counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
-    counters_->involuntary_context_switches_->Add(
-        usage.ru_nivcsw - usage_base_.ru_nivcsw);
-  }
-
-  /// Update counter when object is destroyed
-  ~ThreadCounterMeasurement() {
-    Stop();
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
-  ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
-
-  bool stop_;
-  rusage usage_base_;
-  MonotonicStopWatch sw_;
-  RuntimeProfile::ThreadCounters* counters_;
-};
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.cc
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.cc b/be/src/util/simple-logger.cc
index 6fca3fa..87ef0f5 100644
--- a/be/src/util/simple-logger.cc
+++ b/be/src/util/simple-logger.cc
@@ -18,6 +18,7 @@
 #include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/filesystem.hpp>
 #include <gutil/strings/substitute.h>
+#include <boost/thread/lock_guard.hpp>
 
 #include "common/names.h"
 #include "util/logging-support.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.h
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.h b/be/src/util/simple-logger.h
index 66031fd..af17d26 100644
--- a/be/src/util/simple-logger.h
+++ b/be/src/util/simple-logger.h
@@ -15,8 +15,8 @@
 #ifndef IMPALA_SERVICE_SIMPLE_LOGGER_H
 #define IMPALA_SERVICE_SIMPLE_LOGGER_H
 
-#include <boost/thread/thread.hpp>
 #include <fstream>
+#include <boost/thread/mutex.hpp>
 
 #include "common/status.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/streaming-sampler.h
----------------------------------------------------------------------
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index 9cf4151..71e7548 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -17,6 +17,8 @@
 
 #include <string.h>
 #include <iostream>
+#include <boost/thread/lock_guard.hpp>
+
 #include "util/spinlock.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.cc
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index a0bbf97..18ae53d 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -19,6 +19,7 @@
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
 
 using namespace impala;
 using namespace llvm;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 8505f19..d20d864 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -19,10 +19,11 @@
 #include "exec/sort-exec-exprs.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw-value.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
-#include "runtime/descriptors.h"
 
 namespace impala {