You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/17 08:30:17 UTC

[1/4] incubator-impala git commit: IMPALA-3489: Add script to extract breakpad symbols from binaries

Repository: incubator-impala
Updated Branches:
  refs/heads/master 4c9c74dd3 -> a59408b57


IMPALA-3489: Add script to extract breakpad symbols from binaries

Change-Id: I3ee0972efcb50609407b04cd6f4309b244a84861
Reviewed-on: http://gerrit.cloudera.org:8080/2961
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/12799fae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/12799fae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/12799fae

Branch: refs/heads/master
Commit: 12799fae6c01d2ca29fcab37d6d7f2a8a6a409df
Parents: 4c9c74d
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu May 5 14:37:10 2016 +0200
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 17 01:30:11 2016 -0700

----------------------------------------------------------------------
 bin/dump_breakpad_symbols.py       | 275 ++++++++++++++++++++++++++++++++
 infra/python/deps/requirements.txt |   1 +
 2 files changed, 276 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/12799fae/bin/dump_breakpad_symbols.py
----------------------------------------------------------------------
diff --git a/bin/dump_breakpad_symbols.py b/bin/dump_breakpad_symbols.py
new file mode 100755
index 0000000..a164783
--- /dev/null
+++ b/bin/dump_breakpad_symbols.py
@@ -0,0 +1,275 @@
+#!/usr/bin/env impala-python
+# Copyright 2016 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This script can be used to dump symbols using the 'dump_syms' binary, which is contained
+# in Google Breakpad. It supports collecting binary files from different sources:
+#
+#  - Scan an Impala build dir for ELF files
+#  - Read files from stdin
+#  - Process a list of one or multiple explicitly specified files
+#  - Extract an Impala rpm and corresponding debuginfo rpm file, scan for ELF files, and
+#    process them together with their respective .debug file.
+#
+# Dependencies:
+#  - rpm2cpio (sudo apt-get -y install rpm2cpio)
+#  - cpio (sudo apt-get -y install cpio)
+#  - Google Breakpad, either installed via the Impala toolchain or separately
+#
+# Usage: dump_breakpad_symbols.py -h
+#
+# Typical usage patterns:
+# -----------------------
+#
+# * Extract symbols from an rpm file and its debuginfo counterpart:
+#   ./dump_breakpad_symbols -d /tmp/syms \
+#   -r tmp/impala-2.5.0+cdh5.7.0+0-1.cdh5.7.0.p0.147.el6.x86_64.rpm \
+#   -s tmp/impala-debuginfo-2.5.0+cdh5.7.0+0-1.cdh5.7.0.p0.147.el6.x86_64.rpm
+#
+#   Note that this will process all ELF binaries in the rpm, including both debug and
+#   release builds. Files are identified by hashes, so you don't need to worry about
+#   collisions and you can expect it to 'just work'.
+#
+# * Scan an impalad build directory and extract Breakpad symbols from all binaries:
+#   ./dump_breakpad_symbols.py -d /tmp/syms -b be/build/debug
+#
+# * Use the 'minidump_stackwalk' after symbol extraction tool to process a minidump file:
+#   $IMPALA_TOOLCHAIN/breakpad-*/bin/minidump_stackwalk \
+#   /tmp/impala-minidumps/impalad/03c0ee26-bfd1-cf3e-43fa49ca-1a6aae25.dmp /tmp/syms
+
+import errno
+import logging
+import glob
+import magic
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+from argparse import ArgumentParser
+from collections import namedtuple
+
+logging.basicConfig(level=logging.INFO)
+
+BinaryDebugInfo = namedtuple('BinaryDebugInfo', 'path, debug_path')
+
+
+def die(msg=''):
+  """End the process, optionally after printing the passed error message."""
+  logging.error('ERROR: %s\n' % msg)
+  sys.exit(1)
+
+
+def find_dump_syms_binary():
+  """Locate the 'dump_syms' binary from Breakpad.
+
+  We try to locate the package in the Impala toolchain folder.
+  TODO: Lookup the binary in the system path. Not urgent, since the user can specify the
+  path as a command line switch.
+  """
+  toolchain = os.environ.get('IMPALA_TOOLCHAIN')
+  if toolchain:
+    if not os.path.isdir(toolchain):
+      die('Could not find toolchain directory')
+    breakpad_version = os.environ.get('IMPALA_BREAKPAD_VERSION')
+    if not breakpad_version:
+      die('Could not determine breakpad version from toolchain')
+    breakpad_dir = 'breakpad-%s' % breakpad_version
+    dump_syms = os.path.join(toolchain, breakpad_dir, 'bin', 'dump_syms')
+    if not os.path.isfile(dump_syms):
+      die('Could not find dump_syms executable at %s' % dump_syms)
+    return dump_syms
+  return ''
+
+
+def parse_args():
+  """Parse command line arguments and perform sanity checks."""
+  parser = ArgumentParser()
+  parser.add_argument('-d', '--dest_dir', required=True, help="""The target directory,
+      below which to place extracted symbol files""")
+  parser.add_argument('--dump_syms', help='Path to the dump_syms binary from Breakpad')
+  # Options controlling how to find input files.
+  parser.add_argument('-b', '--build_dir', help="""Path to a directory containing results
+      from an Impala build, e.g. be/build/debug""")
+  parser.add_argument('-f', '--binary_files', nargs='+', metavar="FILE",
+      help='List of binary files to process')
+  parser.add_argument('-i', '--stdin_files', action='store_true', help="""Read the list
+      of files to process from stdin""")
+  parser.add_argument('-r', '--rpm', help="""RPM file containing the binaries to process,
+      use with --debuginfo_rpm""")
+  parser.add_argument('-s', '--debuginfo_rpm', help="""RPM file containing the debug
+      symbols matching the binaries in --rpm""")
+  args = parser.parse_args()
+
+  # Post processing checks
+  # Check that either both rpm and debuginfo_rpm are specified, or none.
+  if bool(args.rpm) != bool(args.debuginfo_rpm):
+    parser.print_usage()
+    die('Either both --rpm and --debuginfo_rpm have to be specified, or none')
+  input_flags = [args.build_dir, args.binary_files, args.stdin_files, args.rpm]
+  if sum(1 for flag in input_flags if flag) != 1:
+    die('You need to specify exactly one way to locate input files (-b/-f/-i/-r,-s)')
+
+  return args
+
+
+def ensure_dir_exists(path):
+  """Make sure the directory 'path' exists in a thread-safe way."""
+  try:
+    os.makedirs(path)
+  except OSError as e:
+    if e.errno != errno.EEXIST or not os.path.isdir(path):
+      raise e
+
+
+def walk_path(path):
+  for dirpath, dirnames, filenames in os.walk(path):
+    for name in filenames:
+      yield os.path.join(dirpath, name)
+
+
+def is_regular_file(path):
+  """Check whether 'path' is a regular file, especially not a symlink."""
+  return os.path.isfile(path) and not os.path.islink(path)
+
+
+def is_elf_file(path):
+  """Check whether 'path' is an ELF file."""
+  return is_regular_file(path) and 'ELF' in magic.from_file(path)
+
+
+def find_elf_files(path):
+  """Walk 'path' and return a generator over all ELF files below."""
+  return (f for f in walk_path(path) if is_elf_file(f))
+
+
+def extract_rpm(rpm, out_dir):
+  """Extract 'rpm' into 'out_dir'."""
+  assert os.path.isdir(out_dir)
+  cmd = 'rpm2cpio %s | cpio -id' % rpm
+  subprocess.check_call(cmd, shell=True, cwd=out_dir)
+
+
+def assert_file_exists(path):
+  if not os.path.isfile(path):
+    die('File does not exists: %s' % path)
+
+
+def enumerate_rpm_files(rpm, debuginfo_rpm):
+  """Return a generator over BinaryDebugInfo tuples for all ELF files in 'rpm'.
+
+  This function extracts both RPM files, then walks the binary rpm directory to enumerate
+  all ELF files, matches them to the location of their respective .debug file and yields
+  all tuples thereof. We use a generator here to keep the temporary directory and its
+  contents around until the consumer of the generator has finished its processing.
+  """
+  IMPALA_BINARY_BASE = os.path.join('usr', 'lib', 'impala')
+  IMPALA_DEBUGINFO_BASE = os.path.join('usr', 'lib', 'debug', IMPALA_BINARY_BASE)
+  assert_file_exists(rpm)
+  assert_file_exists(debuginfo_rpm)
+  tmp_dir = tempfile.mkdtemp()
+  try:
+    # Extract rpm
+    logging.info('Extracting: %s' % rpm)
+    extract_rpm(os.path.abspath(rpm), tmp_dir)
+    # Extract debuginfo_rpm
+    logging.info('Extracting: %s' % debuginfo_rpm)
+    extract_rpm(os.path.abspath(debuginfo_rpm), tmp_dir)
+    # Walk rpm path and find elf files
+    binary_base = os.path.join(tmp_dir, IMPALA_BINARY_BASE)
+    debuginfo_base = os.path.join(tmp_dir, IMPALA_DEBUGINFO_BASE)
+    # Find folder with .debug file in debuginfo_rpm path
+    for binary_path in find_elf_files(binary_base):
+      # Add tuple to output
+      rel_dir = os.path.relpath(os.path.dirname(binary_path), binary_base)
+      debug_dir = os.path.join(debuginfo_base, rel_dir)
+      yield BinaryDebugInfo(binary_path, debug_dir)
+  finally:
+    shutil.rmtree(tmp_dir)
+
+
+def enumerate_binaries(args):
+  """Enumerate all BinaryDebugInfo tuples, from which symbols should be extracted.
+
+  This function returns iterables, either lists or generators.
+  """
+  if args.binary_files:
+    return (BinaryDebugInfo(f, None) for f in args.binary_files)
+  elif args.stdin_files:
+    return (BinaryDebugInfo(f, None) for f in sys.stdin.read().splitlines())
+  elif args.rpm:
+    return enumerate_rpm_files(args.rpm, args.debuginfo_rpm)
+  elif args.build_dir:
+    return (BinaryDebugInfo(f, None) for f in find_elf_files(args.build_dir))
+  die('No input method provided')
+
+
+def process_binary(dump_syms, binary, out_dir):
+  """Dump symbols of a single binary file and move the result.
+
+  Symbols will be extracted to a temporary file and moved into place afterwards. Required
+  directories will be created if necessary.
+  """
+  logging.info('Processing binary file: %s' % binary.path)
+  ensure_dir_exists(out_dir)
+  # tmp_fd will be closed when the file object created by os.fdopen() below gets
+  # destroyed.
+  tmp_fd, tmp_file = tempfile.mkstemp(dir=out_dir, suffix='.sym')
+  try:
+    # Run dump_syms on the binary.
+    args = [dump_syms, binary.path]
+    if binary.debug_path:
+      args.append(binary.debug_path)
+    proc = subprocess.Popen(args, stdout=os.fdopen(tmp_fd, 'wb'), stderr=subprocess.PIPE)
+    _, stderr = proc.communicate()
+    if proc.returncode != 0:
+      sys.stderr.write('Failed to dump symbols from %s, return code %s\n' %
+          (binary.path, proc.returncode))
+      sys.stderr.write(stderr)
+      os.remove(tmp_file)
+      return False
+    # Parse the temporary file to determine the full target path.
+    with open(tmp_file, 'r') as f:
+      header = f.readline().strip()
+      # Format of header is: MODULE os arch binary_id binary
+      _, _, _, binary_id, binary = header.split(' ')
+      out_path = os.path.join(out_dir, binary, binary_id)
+      ensure_dir_exists(out_path)
+    # Move the temporary file to its final destination.
+    shutil.move(tmp_file, os.path.join(out_path, '%s.sym' % binary))
+  except Exception as e:
+    # Only need to clean up in case of errors.
+    try:
+      os.remove(tmp_file)
+    except EnvironmentError:
+      pass
+    raise e
+  return True
+
+
+def main():
+  args = parse_args()
+  dump_syms = args.dump_syms or find_dump_syms_binary()
+  assert dump_syms
+  status = 0
+  ensure_dir_exists(args.dest_dir)
+  for binary in enumerate_binaries(args):
+    if not process_binary(dump_syms, binary, args.dest_dir):
+      status = 1
+  sys.exit(status)
+
+
+if __name__ == '__main__':
+  main()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/12799fae/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index 9ed842c..2725344 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -46,6 +46,7 @@ pytest == 2.7.2
   py == 1.4.30
 pytest-random == 0.02
 pytest-xdist == 1.12
+python-magic == 0.4.11
 pywebhdfs == 0.3.2
   pbr == 1.8.1
 requests == 2.7.0


[2/4] incubator-impala git commit: Kudu: Fix warnings from clang

Posted by ta...@apache.org.
Kudu: Fix warnings from clang

Changes:
1) Several places in the tests didn't check return statuses.
   KUDU_ASSERT_OK can only be used in functions that return void,
   KUDU_CHECK_OK is used otherwise.
2) The forward declared "class ColumnType" should have actually been a
   struct.

Now there aren't any more Kudu related warnings from clang.

Change-Id: Id3e2f5ec9925c3cf81c7f4048decc6a5f97eee66
Reviewed-on: http://gerrit.cloudera.org:8080/3062
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b634a55b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b634a55b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b634a55b

Branch: refs/heads/master
Commit: b634a55b9244d8e3bc9ca79280d1246455b9ed4f
Parents: 12799fa
Author: Casey Ching <ca...@cloudera.com>
Authored: Fri May 13 12:34:53 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 17 01:30:12 2016 -0700

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node-test.cc  | 8 ++++----
 be/src/exec/kudu-table-sink-test.cc | 4 ++--
 be/src/exec/kudu-testutil.h         | 4 ++--
 be/src/exec/kudu-util.h             | 2 +-
 4 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b634a55b/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index 11a04d8..b7453ee 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -224,7 +224,7 @@ class KuduScanNodeTest : public testing::Test {
     string encoded_start_key;
     if (start_key != -1) {
       scoped_ptr<KuduPartialRow> start_key_row(kudu_test_helper_.test_schema().NewRow());
-      start_key_row->SetInt32(0, start_key);
+      KUDU_ASSERT_OK(start_key_row->SetInt32(0, start_key));
       start_key_row->EncodeRowKey(&encoded_start_key);
     } else {
       encoded_start_key = "";
@@ -233,7 +233,7 @@ class KuduScanNodeTest : public testing::Test {
     string encoded_stop_key;
     if (stop_key != -1) {
       gscoped_ptr<KuduPartialRow> stop_key_row(kudu_test_helper_.test_schema().NewRow());
-      stop_key_row->SetInt32(0, stop_key);
+      KUDU_ASSERT_OK(stop_key_row->SetInt32(0, stop_key));
       stop_key_row->EncodeRowKey(&encoded_stop_key);
     } else {
       encoded_stop_key = "";
@@ -476,7 +476,7 @@ TEST_F(KuduScanNodeTest, TestScanEmptyString) {
   gscoped_ptr<KuduInsert> insert(kudu_test_helper_.table()->NewInsert());
   KUDU_ASSERT_OK(insert->mutable_row()->SetInt32(0, 10));
   KUDU_ASSERT_OK(insert->mutable_row()->SetString(2, ""));
-  session->Apply(insert.release());
+  KUDU_ASSERT_OK(session->Apply(insert.release()));
   KUDU_ASSERT_OK(session->Flush());
   ASSERT_FALSE(session->HasPendingOperations());
 
@@ -557,7 +557,7 @@ TEST_F(KuduScanNodeTest, BenchmarkScanNode) {
     for (int i = 1; i < NUM_SPLITS; ++i) {
        int split_key = (NUM_ROWS / NUM_SPLITS) * i;
        KuduPartialRow* row = kudu_test_helper_.test_schema().NewRow();
-       row->SetInt32(0, split_key);
+       KUDU_ASSERT_OK(row->SetInt32(0, split_key));
        split_rows.push_back(row);
     }
     kudu_test_helper_.CreateTable(BASE_TABLE_NAME, &split_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b634a55b/be/src/exec/kudu-table-sink-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink-test.cc b/be/src/exec/kudu-table-sink-test.cc
index 11dfa5e..1d5c642 100644
--- a/be/src/exec/kudu-table-sink-test.cc
+++ b/be/src/exec/kudu-table-sink-test.cc
@@ -178,8 +178,8 @@ class KuduTableSinkTest : public testing::Test {
   void Verify(int num_columns, int expected_num_rows, int factor, string val,
       int skip_val) {
     kudu::client::KuduScanner scanner(kudu_test_helper_.table().get());
-    scanner.SetReadMode(kudu::client::KuduScanner::READ_AT_SNAPSHOT);
-    scanner.SetFaultTolerant();
+    KUDU_ASSERT_OK(scanner.SetReadMode(kudu::client::KuduScanner::READ_AT_SNAPSHOT));
+    KUDU_ASSERT_OK(scanner.SetFaultTolerant());
     KUDU_ASSERT_OK(scanner.Open());
     int row_idx = 0;
     while (scanner.HasMoreRows()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b634a55b/be/src/exec/kudu-testutil.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-testutil.h b/be/src/exec/kudu-testutil.h
index f45d6cd..6849e74 100644
--- a/be/src/exec/kudu-testutil.h
+++ b/be/src/exec/kudu-testutil.h
@@ -134,7 +134,7 @@ class KuduTestHelper {
     for (int i = first_row; i < num_rows + first_row; i++) {
       KUDU_ASSERT_OK(session->Apply(BuildTestRow(table, i, num_cols).release()));
       if (i % 1000 == 0) {
-        session->Flush();
+        KUDU_ASSERT_OK(session->Flush());
       }
     }
     KUDU_ASSERT_OK(session->Flush());
@@ -155,7 +155,7 @@ class KuduTestHelper {
   vector<const KuduPartialRow*> DefaultSplitRows() {
     vector<const KuduPartialRow*> keys;
     KuduPartialRow* key = test_schema_.NewRow();
-    key->SetInt32(0, 5);
+    KUDU_CHECK_OK(key->SetInt32(0, 5));
     keys.push_back(key);
     return keys;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b634a55b/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 47debdc..e200231 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -25,8 +25,8 @@ namespace impala {
 class TExpr;
 class KuduTableDescriptor;
 class Status;
-class ColumnType;
 class TupleDescriptor;
+struct ColumnType;
 
 /// Returns false when running on an operating system that Kudu doesn't support. If this
 /// check fails, there is no way Kudu should be expected to work. Exposed for testing.


[3/4] incubator-impala git commit: IMPALA-3286: Prefetching for PHJ probing.

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 45b5a0e..d575c01 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -53,13 +53,13 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
 template<bool AGGREGATED_ROWS>
 Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
     HashTableCtx* __restrict__ ht_ctx) {
-  uint32_t hash = 0;
   if (AGGREGATED_ROWS) {
-    if (!ht_ctx->EvalAndHashBuild(row, &hash)) return Status::OK();
+    if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK();
   } else {
-    if (!ht_ctx->EvalAndHashProbe(row, &hash)) return Status::OK();
+    if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK();
   }
 
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
   // To process this row, we first see if it can be aggregated or inserted into this
   // partition's hash table. If we need to insert it and that fails, due to OOM, we
   // spill the partition. The partition to spill is not necessarily dst_partition,
@@ -76,7 +76,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
   bool found;
   // Find the appropriate bucket in the hash table. There will always be a free
   // bucket because we checked the size above.
-  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, hash, &found);
+  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found);
   DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
   if (AGGREGATED_ROWS) {
     // If the row is already an aggregate row, it cannot match anything in the
@@ -149,9 +149,9 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
 
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
   FOREACH_ROW(in_batch, 0, in_batch_iter) {
-    uint32_t hash;
     TupleRow* in_row = in_batch_iter.Get();
-    if (!ht_ctx->EvalAndHashProbe(in_row, &hash)) continue;
+    if (!ht_ctx->EvalAndHashProbe(in_row)) continue;
+    uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
 
     if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, hash,
@@ -192,7 +192,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
   DCHECK_GE(*remaining_capacity, 0);
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not aggregated.
-  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, hash, &found);
+  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   Tuple* intermediate_tuple;
   if (found) {
     intermediate_tuple = it.GetTuple();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index b7dca61..2cde059 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -252,9 +252,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
   } else {
-    ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, grouping_expr_ctxs_, true,
-        std::vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
-        MAX_PARTITION_DEPTH, 1));
+    RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, grouping_expr_ctxs_,
+        true, vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
+        MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_));
     RETURN_IF_ERROR(state_->block_mgr()->RegisterClient(
         Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this),
         MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
@@ -984,9 +984,9 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
   int varlen_size = 0;
   // TODO: The hash table could compute this as it hashes.
   for (int expr_idx: string_grouping_exprs_) {
-    StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->last_expr_value(expr_idx));
+    StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
     // Avoid branching by multiplying length by null bit.
-    varlen_size += sv->len * !ht_ctx_->last_expr_value_null(expr_idx);
+    varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
   }
   return varlen_size;
 }
@@ -997,17 +997,17 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
   // Copy over all grouping slots (the variable length data is copied below).
   for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) {
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
-    if (ht_ctx_->last_expr_value_null(i)) {
+    if (ht_ctx_->ExprValueNull(i)) {
       intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
     } else {
-      void* src = ht_ctx_->last_expr_value(i);
+      void* src = ht_ctx_->ExprValue(i);
       void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset());
       memcpy(dst, src, slot_desc->slot_size());
     }
   }
 
   for (int expr_idx: string_grouping_exprs_) {
-    if (ht_ctx_->last_expr_value_null(expr_idx)) continue;
+    if (ht_ctx_->ExprValueNull(expr_idx)) continue;
 
     SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
     // ptr and len were already copied to the fixed-len part of string value

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index ef4c010..d66eda0 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -219,67 +219,145 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
 }
 
 template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
+    ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+    ExprContext* const* conjunct_ctxs, int num_conjuncts,
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+  if (JoinOp == TJoinOp::INNER_JOIN) {
+    return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
+        conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity);
+  } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
+             JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
+    return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity);
+  } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
+             JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
+             JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity, status);
+  } else {
+    DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
+           JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
+    return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
+        num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+        remaining_capacity);
+  }
+}
+
+template<int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
     HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
-    int* remaining_capacity, int num_other_join_conjuncts, Status* status) {
-  while (!probe_batch_iterator->AtEnd()) {
+    int* remaining_capacity, Status* status) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  while (!expr_vals_cache->AtEnd()) {
     // Establish current_probe_row_ and find its corresponding partition.
+    DCHECK(!probe_batch_iterator->AtEnd());
     current_probe_row_ = probe_batch_iterator->Get();
-    probe_batch_iterator->Next();
     matched_probe_ = false;
 
-    uint32_t hash;
-    if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) {
+    // True if the current row should be skipped for probing.
+    bool skip_row = false;
+
+    // The hash of the expressions results for the current probe row.
+    uint32_t hash = expr_vals_cache->ExprValuesHash();
+    // Hoist the followings out of the else statement below to speed up non-null case.
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    HashTable* hash_tbl = hash_tbls_[partition_idx];
+
+    // Fetch the hash and expr values' nullness for this row.
+    if (expr_vals_cache->IsRowNull()) {
       if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
+        const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
         // For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
-        // 1. No build rows -> Return this row.
+        // 1. No build rows -> Return this row. The check for 'non_empty_build_'
+        //    is for this case.
         // 2. Has build rows & no other join predicates, skip row.
         // 3. Has build rows & other join predicates, we need to evaluate against all
         // build rows. First evaluate it against this partition, and if there is not
         // a match, save it to evaluate against other partitions later. If there
         // is a match, the row is skipped.
-        if (num_other_join_conjuncts > 0) {
-          if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) {
+        if (num_other_join_conjuncts == 0) {
+          // Condition 2 above.
+          skip_row = true;
+        } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) {
+          // Condition 3 above.
+          matched_null_probe_.push_back(false);
+          skip_row = true;
+        } else {
+          // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out.
+          DCHECK(!status->ok());
+          return false;
+        }
+      }
+    } else {
+      // The build partition is in memory. Return this row for probing.
+      if (LIKELY(hash_tbl != NULL)) {
+        hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx);
+      } else {
+        // The build partition is either empty or spilled.
+        Partition* partition = hash_partitions_[partition_idx];
+        // This partition is closed, meaning the build side for this partition was empty.
+        if (UNLIKELY(partition->is_closed())) {
+          DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+        } else {
+          // This partition is not in memory, spill the probe row and move to the next row.
+          DCHECK(partition->is_spilled());
+          DCHECK(partition->probe_rows() != NULL);
+          // Skip the current row if we manage to append to the spilled partition's BTS.
+          // Otherwise, we need to bail out and report the failure.
+          if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
             DCHECK(!status->ok());
             return false;
           }
-          matched_null_probe_.push_back(false);
+          skip_row = true;
         }
-        continue;
       }
-      return true;
-    }
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    // The build partition is in memory. Return this row for probing.
-    if (LIKELY(hash_tbls_[partition_idx] != NULL)) {
-      hash_tbl_iterator_ = hash_tbls_[partition_idx]->FindProbeRow(ht_ctx, hash);
-      return true;
-    }
-    // The build partition is either empty or spilled.
-    Partition* partition = hash_partitions_[partition_idx];
-    // This partition is closed, meaning the build side for this partition was empty.
-    if (UNLIKELY(partition->is_closed())) {
-      DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
-      return true;
-    }
-    // This partition is not in memory, spill the probe row and move to the next row.
-    DCHECK(partition->is_spilled());
-    DCHECK(partition->probe_rows() != NULL);
-    if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
-      DCHECK(!status->ok());
-      return false;
     }
+    // Move to the next probe row and hash table context's cached value.
+    probe_batch_iterator->Next();
+    expr_vals_cache->NextRow();
+    if (skip_row) continue;
+    DCHECK(status->ok());
+    return true;
+  }
+  if (probe_batch_iterator->AtEnd()) {
+    // No more probe row.
+    current_probe_row_ = NULL;
   }
-  // Finished this batch.
-  current_probe_row_ = NULL;
   return false;
 }
 
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
+void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup(
+    TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) {
+  RowBatch* probe_batch = probe_batch_.get();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  DCHECK(expr_vals_cache->AtEnd());
+
+  expr_vals_cache->Reset();
+  FOREACH_ROW_LIMIT(probe_batch, probe_batch_pos_, prefetch_size, batch_iter) {
+    TupleRow* row = batch_iter.Get();
+    if (ht_ctx->EvalAndHashProbe(row)) {
+      if (prefetch_mode != TPrefetchMode::NONE) {
+        uint32_t hash = expr_vals_cache->ExprValuesHash();
+        const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+        HashTable* hash_tbl = hash_tbls_[partition_idx];
+        if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<true>(hash);
+      }
+    } else {
+      expr_vals_cache->SetRowNull();
+    }
+    expr_vals_cache->NextRow();
+  }
+  expr_vals_cache->ResetForRead();
+}
+
+// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen.
 template<int const JoinOp>
-int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
-    HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
+    RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
   ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
   const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -292,48 +370,51 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
   // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
   RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
   int remaining_capacity = max_rows;
+  bool has_probe_rows = current_probe_row_ != NULL || !probe_batch_iterator.AtEnd();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
 
-  do {
-    if (current_probe_row_ != NULL) {
-      if (JoinOp == TJoinOp::INNER_JOIN) {
-        if (!ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
-            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity)) {
-          break;
-        }
-      } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
-                 JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
-        if (!ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity)) {
-          break;
-        }
-      } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
-                 JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
-                 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-        if (!ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity, status)) {
-          break;
-        }
-      } else {
-        DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
-            JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
-        if (!ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
-            num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
-            &remaining_capacity)) {
+  // Keep processing more probe rows if there are more to process and the output batch
+  // has room and we haven't hit any error yet.
+  while (has_probe_rows && remaining_capacity > 0 && status->ok()) {
+    // Prefetch for the current hash_tbl_iterator_.
+    if (prefetch_mode != TPrefetchMode::NONE) {
+      hash_tbl_iterator_.PrefetchBucket<true>();
+    }
+    // Evaluate and hash more rows if prefetch group is empty. A prefetch group is a cache
+    // of probe expressions results, nullness of the expression values and hash values
+    // against some consecutive number of rows in the probe batch. Prefetching, if
+    // enabled, is interleaved with the rows' evaluation and hashing. If the prefetch
+    // group is partially full (e.g. we returned before the current prefetch group was
+    // exhausted in the previous iteration), we will proceed with the remaining items in
+    // the values cache.
+    if (expr_vals_cache->AtEnd()) {
+      EvalAndHashProbePrefetchGroup(prefetch_mode, ht_ctx);
+    }
+    // Process the prefetch group.
+    do {
+      // 'current_probe_row_' can be NULL on the first iteration through this loop.
+      if (current_probe_row_ != NULL) {
+        if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts,
+            conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity,
+            status)) {
+          if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
           break;
         }
       }
-    }
-    // Must have reached the end of the hash table iterator for the current row before
-    // moving to the next row.
-    DCHECK(hash_tbl_iterator_.AtEnd());
-    DCHECK(status->ok());
-  } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
-      num_other_join_conjuncts, status));
-  // Update where we are in the probe batch.
-  probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
-      probe_batch_->num_tuples_per_row();
+      // Must have reached the end of the hash table iterator for the current row before
+      // moving to the next row.
+      DCHECK(hash_tbl_iterator_.AtEnd());
+      DCHECK(status->ok());
+    } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
+        status));
+    // Update whether there are more probe rows to process in the current batch.
+    has_probe_rows = current_probe_row_ != NULL;
+    if (!has_probe_rows) DCHECK(probe_batch_iterator.AtEnd());
+    // Update where we are in the probe batch.
+    probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
+        probe_batch_->num_tuples_per_row();
+  }
+
   int num_rows_added;
   if (LIKELY(status->ok())) {
     num_rows_added = max_rows - remaining_capacity;
@@ -346,42 +427,14 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
   return num_rows_added;
 }
 
-int PartitionedHashJoinNode::ProcessProbeBatch(
-    const TJoinOp::type join_op, RowBatch* out_batch,
-    HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
-  switch (join_op) {
-    case TJoinOp::INNER_JOIN:
-      return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_SEMI_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::LEFT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx,
-          status);
-    case TJoinOp::RIGHT_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::RIGHT_SEMI_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::RIGHT_ANTI_JOIN:
-      return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx, status);
-    case TJoinOp::FULL_OUTER_JOIN:
-      return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx, status);
-    default:
-      DCHECK(false) << "Unknown join type";
-      return -1;
-  }
-}
-
 Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
     bool build_filters) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache();
+  expr_vals_cache->Reset();
   FOREACH_ROW(build_batch, 0, build_batch_iter) {
     DCHECK(build_status_.ok());
-    uint32_t hash;
     TupleRow* build_row = build_batch_iter.Get();
-    if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
+    if (!ht_ctx_->EvalAndHashBuild(build_row)) {
       if (null_aware_partition_ != NULL) {
         // TODO: remove with codegen/template
         // If we are NULL aware and this build row has NULL in the eq join slot,
@@ -405,6 +458,7 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
         ctx.local_bloom_filter->Insert(filter_hash);
       }
     }
+    const uint32_t hash = expr_vals_cache->ExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
     Partition* partition = hash_partitions_[partition_idx];
     const bool result = AppendRow(partition->build_rows(), build_row, &build_status_);
@@ -413,37 +467,69 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
   return Status::OK();
 }
 
-bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ht_ctx,
-    RowBatch* batch, const vector<BufferedTupleStream::RowIdx>& indices) {
-  DCHECK_LE(batch->num_rows(), hash_values_.size());
-  DCHECK_LE(batch->num_rows(), null_bitmap_.num_bits());
+bool PartitionedHashJoinNode::Partition::InsertBatch(
+    TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch,
+    const vector<BufferedTupleStream::RowIdx>& indices) {
   // Compute the hash values and prefetch the hash table buckets.
-  int i = 0;
-  uint32_t* hash_values = hash_values_.data();
-  null_bitmap_.SetAllBits(false);
-  FOREACH_ROW(batch, 0, batch_iter) {
-    if (ht_ctx->EvalAndHashBuild(batch_iter.Get(), &hash_values[i])) {
-      // TODO: Find the optimal prefetch batch size. This may be something
-      // processor dependent so we may need calibration at Impala startup time.
-      hash_tbl_->PrefetchBucket<false>(hash_values[i]);
-    } else {
-      null_bitmap_.Set<false>(i, true);
+  const int num_rows = batch->num_rows();
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int prefetch_size = expr_vals_cache->capacity();
+  const BufferedTupleStream::RowIdx* row_indices = indices.data();
+  for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
+       prefetch_group_row += prefetch_size) {
+    int cur_row = prefetch_group_row;
+    expr_vals_cache->Reset();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+      if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
+        if (prefetch_mode != TPrefetchMode::NONE) {
+          hash_tbl_->PrefetchBucket<false>(expr_vals_cache->ExprValuesHash());
+        }
+      } else {
+        expr_vals_cache->SetRowNull();
+      }
+      expr_vals_cache->NextRow();
     }
-    ++i;
-  }
-  // Do the insertion.
-  i = 0;
-  const BufferedTupleStream::RowIdx* row_idx = indices.data();
-  FOREACH_ROW(batch, 0, batch_iter) {
-    if (LIKELY(!null_bitmap_.Get<false>(i))) {
+    // Do the insertion.
+    expr_vals_cache->ResetForRead();
+    FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
       TupleRow* row = batch_iter.Get();
-      if (UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx[i], row, hash_values[i]))) {
+      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+      if (!expr_vals_cache->IsRowNull() &&
+          UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
         return false;
       }
+      expr_vals_cache->NextRow();
+      ++cur_row;
     }
-    ++i;
   }
   return true;
 }
 
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+    Status* status);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 47c118a..1888e06 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -143,7 +143,9 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
   // Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free
   // local allocations in QueryMaintenance() since the build thread is not run
   // concurrently with other expr evaluation in this join node.
-  AddExprCtxsToFree(probe_expr_ctxs_);
+  // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
+  // values in ExprValuesCache. Local allocations need to survive until the cache is reset
+  // so we need to manually free probe expr local allocations.
   AddExprCtxsToFree(build_expr_ctxs_);
 
   // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all
@@ -162,10 +164,10 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
       join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
       std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
                       std::logical_or<bool>());
-  ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, probe_expr_ctxs_, should_store_nulls,
-      is_not_distinct_from_, state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
-      child(1)->row_desc().tuple_descriptors().size()));
-
+  RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
+      should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
+      MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
+      &ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
   }
@@ -206,20 +208,20 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
         ht_ctx_->CodegenHashCurrentRow(state, true, &murmur_hash_fn));
 
     // Codegen for evaluating build rows
-    Function* eval_row_fn;
-    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_row_fn));
+    Function* eval_build_row_fn;
+    codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
 
     if (codegen_status.ok()) {
       // Codegen for build path
       build_codegen_status =
-          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_row_fn);
+          CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
       if (build_codegen_status.ok()) build_codegen_enabled = true;
       // Codegen for probe path
       probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
       if (probe_codegen_status.ok()) probe_codegen_enabled = true;
       // Codegen for InsertBatch()
       insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
-          eval_row_fn);
+          eval_build_row_fn);
       if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
     } else {
       build_codegen_status = codegen_status;
@@ -324,8 +326,7 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
   : parent_(parent),
     is_closed_(false),
     is_spilled_(false),
-    level_(level),
-    null_bitmap_(state->batch_size()) {
+    level_(level) {
   build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
       state->block_mgr(), parent_->block_mgr_client_,
       true /* use_initial_small_buffers */, false /* read_write */);
@@ -334,8 +335,6 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
       state->block_mgr(), parent_->block_mgr_client_,
       true /* use_initial_small_buffers */, false /* read_write */ );
   DCHECK(probe_rows_ != NULL);
-  hash_values_.resize(state->batch_size());
-  null_bitmap_.SetAllBits(false);
 }
 
 PartitionedHashJoinNode::Partition::~Partition() {
@@ -465,6 +464,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
     DCHECK_EQ(batch.num_rows(), indices.size());
     DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
         << build_rows()->RowConsumesMemory();
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     SCOPED_TIMER(parent_->build_timer_);
     if (parent_->insert_batch_fn_ != NULL) {
       InsertBatchFn insert_batch_fn;
@@ -474,9 +474,13 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
         insert_batch_fn = parent_->insert_batch_fn_;
       }
       DCHECK(insert_batch_fn != NULL);
-      if (UNLIKELY(!insert_batch_fn(this, ctx, &batch, indices))) goto not_built;
+      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
     } else {
-      if (UNLIKELY(!InsertBatch(ctx, &batch, indices))) goto not_built;
+      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
+        goto not_built;
+      }
     }
     RETURN_IF_ERROR(state->GetQueryStatus());
     parent_->FreeLocalAllocations();
@@ -652,7 +656,6 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
     DCHECK(new_partition != NULL);
     hash_partitions_.push_back(partition_pool_->Add(new_partition));
     RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
-
     // Initialize a buffer for the probe here to make sure why have it if we need it.
     // While this is not strictly necessary (there are some cases where we won't need this
     // buffer), the benefit is low. Not grabbing this buffer means there is an additional
@@ -671,6 +674,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
   while (!eos) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
+    // 'probe_expr_ctxs_' should have made no local allocations in this function.
+    DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_));
     if (input_partition_ == NULL) {
       // If we are still consuming batches from the build side.
       {
@@ -887,6 +892,43 @@ int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
   return max_rows;
 }
 
+int PartitionedHashJoinNode::ProcessProbeBatch(
+    const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
+    RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
+  switch (join_op) {
+    case TJoinOp::INNER_JOIN:
+      return ProcessProbeBatch<TJoinOp::INNER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_SEMI_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::LEFT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(prefetch_mode,
+          out_batch, ht_ctx, status);
+    case TJoinOp::RIGHT_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::RIGHT_SEMI_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::RIGHT_ANTI_JOIN:
+      return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    case TJoinOp::FULL_OUTER_JOIN:
+      return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(prefetch_mode, out_batch,
+          ht_ctx, status);
+    default:
+      DCHECK(false) << "Unknown join type";
+      return -1;
+  }
+}
+
 Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch,
     bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -951,16 +993,19 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
       // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
       // in the xcompiled function, so call it here instead.
       int rows_added = 0;
+      TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
       SCOPED_TIMER(probe_timer_);
       if (process_probe_batch_fn_ == NULL) {
-        rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get(), &status);
+        rows_added = ProcessProbeBatch(join_op_, prefetch_mode, out_batch, ht_ctx_.get(),
+            &status);
       } else {
         DCHECK(process_probe_batch_fn_level0_ != NULL);
         if (ht_ctx_->level() == 0) {
-          rows_added = process_probe_batch_fn_level0_(this, out_batch, ht_ctx_.get(),
-              &status);
+          rows_added = process_probe_batch_fn_level0_(this, prefetch_mode, out_batch,
+              ht_ctx_.get(), &status);
         } else {
-          rows_added = process_probe_batch_fn_(this, out_batch, ht_ctx_.get(), &status);
+          rows_added = process_probe_batch_fn_(this, prefetch_mode, out_batch,
+              ht_ctx_.get(), &status);
         }
       }
       if (UNLIKELY(rows_added < 0)) {
@@ -982,6 +1027,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
     } else {
       RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
     }
+    // Free local allocations of the probe side expressions only after ExprValuesCache
+    // has been reset.
+    DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
+    ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
 
     // We want to return as soon as we have attached a tuple stream to the out_batch
     // (before preparing a new partition). The attached tuple stream will be recycled
@@ -1615,6 +1664,7 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
       codegen->CloneFunction(process_build_batch_fn);
 
   // Always build runtime filters at level0 (if there are any).
+  // Note that the first argument of this function is the return value.
   Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
   build_filters_l0_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
@@ -1630,7 +1680,8 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
   DCHECK_EQ(replaced, 1);
 
   // Never build filters after repartitioning, as all rows have already been added to the
-  // filters during the level0 build.
+  // filters during the level0 build. Note that the first argument of this function is the
+  // return value.
   Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
   build_filters_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
@@ -1698,21 +1749,26 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   DCHECK(process_probe_batch_fn != NULL);
   process_probe_batch_fn->setName("ProcessProbeBatch");
 
-  // Since ProcessProbeBatch() is a templated function, it has linkonce_odr linkage, which
-  // means the function can be removed if it's not referenced. Change to weak_odr, which
-  // has the same semantics except it can't be removed.
-  // See http://llvm.org/docs/LangRef.html#linkage-types
-  DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage)
+  // Verifies that ProcessProbeBatch() has weak_odr linkage so it's not discarded even
+  // if it's not referenced. See http://llvm.org/docs/LangRef.html#linkage-types
+  DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::WeakODRLinkage)
       << LlvmCodeGen::Print(process_probe_batch_fn);
-  process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage);
 
   // Bake in %this pointer argument to process_probe_batch_fn.
   Value* this_arg = codegen->GetArgument(process_probe_batch_fn, 0);
   Value* this_loc = codegen->CastPtrToLlvmPtr(this_arg->getType(), this);
   this_arg->replaceAllUsesWith(this_loc);
 
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   // Bake in %ht_ctx pointer argument to process_probe_batch_fn
-  Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 2);
+  Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 3);
   Value* ht_ctx_loc = codegen->CastPtrToLlvmPtr(ht_ctx_arg->getType(), ht_ctx_.get());
   ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc);
 
@@ -1823,9 +1879,17 @@ Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
   Function* build_equals_fn;
   RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
 
+  // Replace the parameter 'prefetch_mode' with constant.
+  Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
+  TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+  DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   // Use codegen'd EvalBuildRow() function
   int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
-  DCHECK_EQ(replaced, 2);
+  DCHECK_EQ(replaced, 1);
 
   // Use codegen'd Equals() function
   replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 37c2c4f..a76dced 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -189,7 +189,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes and updates the hash table for the current probe row for either
   /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
-  /// rows will be appended to the 'out_batch'; For RIGHT_ANTI_JOIN, update the
+  /// rows will be appended to the output batch; For RIGHT_ANTI_JOIN, update the
   /// hash table only if matches are found. The actual output happens in
   /// OutputUnmatchedBuild(). Returns true if probing is done for the current
   /// probe row and should continue to next row.
@@ -207,8 +207,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
   /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
-  /// to 'out_batch' if it's part of the output. Returns true if probing
-  /// is done for the current probe row and should continue to next row.
+  /// to output batch if there is a match (for LEFT_SEMI_JOIN) or if there is no
+  /// match (for LEFT_ANTI_JOIN). Returns true if probing is done for the current
+  /// probe row and should continue to next row.
   ///
   /// 'out_batch_iterator' is the iterator for the output batch.
   /// 'remaining_capacity' tracks the number of additional rows that can be added to
@@ -223,7 +224,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
   /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
-  /// will appended to 'out_batch'. For RIGHT/FULL_OUTER_JOIN, some of the outputs
+  /// will be appended to output batch. For RIGHT/FULL_OUTER_JOIN, some of the outputs
   /// are added in OutputUnmatchedBuild(). Returns true if probing is done for the
   /// current probe row and should continue to next row.
   ///
@@ -239,6 +240,25 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
       ExprContext* const* conjunct_ctxs, int num_conjuncts,
       RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
 
+  /// Probes 'current_probe_row_' against the the hash tables and append outputs
+  /// to output batch. Wrapper around the join-type specific probe row functions
+  /// declared above.
+  template<int const JoinOp>
+  bool inline ProcessProbeRow(
+      ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+      ExprContext* const* conjunct_ctxs, int num_conjuncts,
+      RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+
+  /// Evaluates some number of rows in 'probe_batch_' against the probe expressions
+  /// and hashes the results to 32-bit hash values. The evaluation results and the hash
+  /// values are stored in the expression values cache in 'ht_ctx'. The number of rows
+  /// processed depends on the capacity available in 'ht_ctx->expr_values_cache_'.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
+  /// hash table buckets will be prefetched based on the hash values computed. Note
+  /// that 'prefetch_mode' will be substituted with constants during codegen time.
+  void EvalAndHashProbePrefetchGroup(TPrefetchMode::type prefetch_mode,
+      HashTableCtx* ctx);
+
   /// Find the next probe row. Returns true if a probe row is found. In which case,
   /// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the
   /// next probe row and its corresponding partition. 'status' may be updated if
@@ -246,7 +266,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   template<int const JoinOp>
   bool inline NextProbeRow(
       HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
-      int* remaining_capacity, int num_other_join_conjuncts, Status* status);
+      int* remaining_capacity, Status* status);
 
   /// Process probe rows from probe_batch_. Returns either if out_batch is full or
   /// probe_batch_ is entirely consumed.
@@ -256,11 +276,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// set). This function doesn't commit rows to the output batch so it's the caller's
   /// responsibility to do so.
   template<int const JoinOp>
-  int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
+  int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx,
+      Status* status);
 
   /// Wrapper that calls the templated version of ProcessProbeBatch() based on 'join_op'.
-  int ProcessProbeBatch(const TJoinOp::type join_op, RowBatch* out_batch,
-      HashTableCtx* ht_ctx, Status* status);
+  int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
+      RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
 
   /// Sweep the hash_tbl_ of the partition that is at the front of
   /// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build
@@ -553,9 +574,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
     /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
     /// containing the index of each row's index into the hash table's tuple stream.
-    /// This function is replaced with a codegen'd version.
-    bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
-        const std::vector<BufferedTupleStream::RowIdx>& indices);
+    /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
+    /// table buckets which the rows hashes to will be prefetched. This parameter is
+    /// replaced with a constant during codegen time. This function may be replaced with
+    /// a codegen'd version. Returns true if all rows in 'batch' are successfully
+    /// inserted.
+    bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
+        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
 
     PartitionedHashJoinNode* parent_;
 
@@ -581,13 +606,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
     /// If NULL, ownership has been transfered.
     BufferedTupleStream* build_rows_;
     BufferedTupleStream* probe_rows_;
-
-    /// Store hash values of each row for the current batch computed during prefetching.
-    std::vector<uint32_t> hash_values_;
-
-    /// Bitmap to indicate rows evaluated to NULL for the current batch when building
-    /// hash tables.
-    Bitmap null_bitmap_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
@@ -600,14 +618,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   ProcessBuildBatchFn process_build_batch_fn_;
   ProcessBuildBatchFn process_build_batch_fn_level0_;
 
-  typedef int (*ProcessProbeBatchFn)(
-      PartitionedHashJoinNode*, RowBatch*, HashTableCtx*, Status*);
+  typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
+      TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
   /// Jitted ProcessProbeBatch function pointers.  NULL if codegen is disabled.
   ProcessProbeBatchFn process_probe_batch_fn_;
   ProcessProbeBatchFn process_probe_batch_fn_level0_;
 
-  typedef bool (*InsertBatchFn)(Partition*, HashTableCtx*, RowBatch*,
-      const std::vector<BufferedTupleStream::RowIdx>&);
+  typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*,
+      RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&);
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
   InsertBatchFn insert_batch_fn_;
   InsertBatchFn insert_batch_fn_level0_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index d63df1e..8ebeab3 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -26,6 +26,7 @@ inline void PartitionedHashJoinNode::ResetForProbe() {
   probe_batch_pos_ = 0;
   matched_probe_ = true;
   hash_tbl_iterator_.SetAtEnd();
+  ht_ctx_->expr_values_cache()->Reset();
 }
 
 inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 7231c5e..2a01bfa 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -110,8 +110,23 @@ Status ExprContext::Clone(RuntimeState* state, ExprContext** new_ctx) {
   return root_->Open(state, *new_ctx, FunctionContext::THREAD_LOCAL);
 }
 
-void ExprContext::FreeLocalAllocations() {
-  FreeLocalAllocations(fn_contexts_);
+bool ExprContext::HasLocalAllocations(const vector<ExprContext*>& ctxs) {
+  for (int i = 0; i < ctxs.size(); ++i) {
+    if (ctxs[i]->HasLocalAllocations()) return true;
+  }
+  return false;
+}
+
+bool ExprContext::HasLocalAllocations() {
+  return HasLocalAllocations(fn_contexts_);
+}
+
+bool ExprContext::HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs) {
+  for (int i = 0; i < fn_ctxs.size(); ++i) {
+    if (fn_ctxs[i]->impl()->closed()) continue;
+    if (fn_ctxs[i]->impl()->HasLocalAllocations()) return true;
+  }
+  return false;
 }
 
 void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
@@ -120,6 +135,10 @@ void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
   }
 }
 
+void ExprContext::FreeLocalAllocations() {
+  FreeLocalAllocations(fn_contexts_);
+}
+
 void ExprContext::FreeLocalAllocations(const vector<FunctionContext*>& fn_ctxs) {
   for (int i = 0; i < fn_ctxs.size(); ++i) {
     if (fn_ctxs[i]->impl()->closed()) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 0816774..9078ce6 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -123,10 +123,16 @@ class ExprContext {
   TimestampVal GetTimestampVal(TupleRow* row);
   DecimalVal GetDecimalVal(TupleRow* row);
 
+  /// Returns true if any of the expression contexts in the array has local allocations.
+  /// The last two are helper functions.
+  static bool HasLocalAllocations(const std::vector<ExprContext*>& ctxs);
+  bool HasLocalAllocations();
+  static bool HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs);
+
   /// Frees all local allocations made by fn_contexts_. This can be called when result
-  /// data from this context is no longer needed.
-  void FreeLocalAllocations();
+  /// data from this context is no longer needed. The last two are helper functions.
   static void FreeLocalAllocations(const std::vector<ExprContext*>& ctxs);
+  void FreeLocalAllocations();
   static void FreeLocalAllocations(const std::vector<FunctionContext*>& ctxs);
 
   static const char* LLVM_CLASS_NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aac0043..3f2ba29 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -143,16 +143,21 @@ class RowBatch {
   }
 
   /// An iterator for going through a row batch, starting at 'row_idx'.
-  /// This is more efficient than using GetRow() as it avoids loading the
-  /// row batch state and doing multiplication on each loop with GetRow().
+  /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
+  /// or the last row, whichever comes first. Otherwise, it will iterate till the last
+  /// row in the batch. This is more efficient than using GetRow() as it avoids loading
+  /// the row batch state and doing multiplication on each loop with GetRow().
   class Iterator {
    public:
-    IR_ALWAYS_INLINE Iterator(RowBatch* parent, int row_idx) :
+    Iterator(RowBatch* parent, int row_idx, int limit = -1) :
         num_tuples_per_row_(parent->num_tuples_per_row_),
-        row_(parent->tuple_ptrs_ + row_idx * num_tuples_per_row_),
-        row_batch_end_(parent->tuple_ptrs_ + parent->num_rows_ * num_tuples_per_row_),
+        row_(parent->tuple_ptrs_ + num_tuples_per_row_ * row_idx),
+        row_batch_end_(parent->tuple_ptrs_ + num_tuples_per_row_ *
+            (limit == -1 ? parent->num_rows_ :
+                           std::min<int>(row_idx + limit, parent->num_rows_))),
         parent_(parent) {
       DCHECK_GE(row_idx, 0);
+      DCHECK_GT(num_tuples_per_row_, 0);
       /// We allow empty row batches with num_rows_ == capacity_ == 0.
       /// That's why we cannot call GetRow() above to initialize 'row_'.
       DCHECK_LE(row_idx, parent->capacity_);
@@ -397,12 +402,17 @@ class RowBatch {
 
 }
 
-/// Macro for iterating through '_row_batch', starting at '_start_row_idx'.
+/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
 /// '_row_batch' is the row batch to iterate through.
 /// '_start_row_idx' is the starting row index.
 /// '_iter' is the iterator.
+/// '_limit' is the max number of rows to iterate over.
 #define FOREACH_ROW(_row_batch, _start_row_idx, _iter)                  \
     for (RowBatch::Iterator _iter(_row_batch, _start_row_idx);          \
          !_iter.AtEnd(); _iter.Next())
 
+#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter)    \
+    for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit);  \
+         !_iter.AtEnd(); _iter.Next())
+
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 99b227c..39fc7cd 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -78,6 +78,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
       block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(),
       tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
   (*runtime_state)->set_block_mgr(mgr);
+  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
 
   query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 274776c..d12ce1f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -390,6 +390,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::PREFETCH_MODE: {
+        if (iequals(value, "NONE") || iequals(value, "0")) {
+          query_options->__set_prefetch_mode(TPrefetchMode::NONE);
+        } else if (iequals(value, "HT_BUCKET") || iequals(value, "1")) {
+          query_options->__set_prefetch_mode(TPrefetchMode::HT_BUCKET);
+        } else {
+          return Status(Substitute("Invalid prefetch mode '$0'. Valid modes are "
+              "NONE(0) or HT_BUCKET(1)", value));
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fb24530..60c122d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
+      TImpalaQueryOptions::PREFETCH_MODE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -78,7 +78,8 @@ class TQueryOptions;
   QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
   QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
-  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
+  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
+  QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 3e5993b..10ce43b 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -84,6 +84,9 @@ class FunctionContextImpl {
   /// Frees all allocations returned by AllocateLocal().
   void FreeLocalAllocations();
 
+  /// Returns true if there are any allocations returned by AllocateLocal().
+  bool HasLocalAllocations() const { return !local_allocations_.empty(); }
+
   /// Sets constant_args_. The AnyVal* values are owned by the caller.
   void SetConstantArgs(const std::vector<impala_udf::AnyVal*>& constant_args);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 74bef28..ada1a20 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -193,6 +193,9 @@ struct TQueryOptions {
 
   // Maximum runtime filter size, in bytes
   47: optional i32 runtime_filter_max_size = 16777216
+
+  // Prefetching behavior during hash tables' building and probing.
+  48: optional Types.TPrefetchMode prefetch_mode = Types.TPrefetchMode.HT_BUCKET
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 9421ec4..29ae4cc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,7 +219,10 @@ enum TImpalaQueryOptions {
   RUNTIME_FILTER_MAX_SIZE,
 
   // Minimum runtime filter size, in bytes.
-  RUNTIME_FILTER_MIN_SIZE
+  RUNTIME_FILTER_MIN_SIZE,
+
+  // Prefetching behavior during hash tables' building and probing.
+  PREFETCH_MODE
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 85a14dc..24f4524 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -120,6 +120,14 @@ enum TRuntimeFilterMode {
   GLOBAL
 }
 
+enum TPrefetchMode {
+  // No prefetching at all.
+  NONE,
+
+  // Prefetch the hash table buckets.
+  HT_BUCKET
+}
+
 // A TNetworkAddress is the standard host, port representation of a
 // network address. The hostname field must be resolvable to an IPv4
 // address.


[4/4] incubator-impala git commit: IMPALA-3286: Prefetching for PHJ probing.

Posted by ta...@apache.org.
IMPALA-3286: Prefetching for PHJ probing.

This change pipelines the code which probes the hash tables.
This is based on the idea which Mostafa presented earlier.
Essentially, all rows in a row batch will be evaluated and
hashed first before being probed against the hash tables.
Hash table buckets are prefetched as hash values of rows are
computed.

To avoid re-evaluating the rows again during probing (as the rows
have been evaluated once to compute the hash values), hash table
context has been updated to cache the evaluated expression values,
null bits and hash values of some number of rows. Hash table context
provies a new iterator like interface to iterate through the cached
values.

A PREFETCH_MODE query option has also been added to disable prefetching
if necessary. The default mode is 1 which means hash table buckets will
be prefetched. In the future, this mode may be extended to support hash
table buckets' data prefetching too.

Combined with the build side prefetching, a self join of table lineitem
improves by 40% on a single node run on average:

select count(*)
from lineitem o1, lineitem o2
where o1.l_orderkey = o2.l_orderkey and
      o1.l_linenumber = o2.l_linenumber;

Change-Id: Ib42b93d99d09c833571e39d20d58c11ef73f3cc0
Reviewed-on: http://gerrit.cloudera.org:8080/2959
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a59408b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a59408b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a59408b5

Branch: refs/heads/master
Commit: a59408b575ff6f48b8f0e1084ebcf7489c9de8af
Parents: b634a55
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 27 17:09:50 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 17 01:30:12 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc                  | 169 ++++---
 be/src/exec/hash-table.cc                       | 484 ++++++++++++++-----
 be/src/exec/hash-table.h                        | 334 +++++++++----
 be/src/exec/hash-table.inline.h                 |  60 +--
 be/src/exec/partitioned-aggregation-node-ir.cc  |  14 +-
 be/src/exec/partitioned-aggregation-node.cc     |  16 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    | 344 ++++++++-----
 be/src/exec/partitioned-hash-join-node.cc       | 122 +++--
 be/src/exec/partitioned-hash-join-node.h        |  62 ++-
 be/src/exec/partitioned-hash-join-node.inline.h |   1 +
 be/src/exprs/expr-context.cc                    |  23 +-
 be/src/exprs/expr-context.h                     |  10 +-
 be/src/runtime/row-batch.h                      |  22 +-
 be/src/runtime/test-env.cc                      |   1 +
 be/src/service/query-options.cc                 |  11 +
 be/src/service/query-options.h                  |   5 +-
 be/src/udf/udf-internal.h                       |   3 +
 common/thrift/ImpalaInternalService.thrift      |   3 +
 common/thrift/ImpalaService.thrift              |   5 +-
 common/thrift/Types.thrift                      |   8 +
 20 files changed, 1177 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 25cd4f1..7559473 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -145,13 +145,12 @@ class HashTableTest : public testing::Test {
 
   void ProbeTest(HashTable* table, HashTableCtx* ht_ctx,
       ProbeTestData* data, int num_data, bool scan) {
-    uint32_t hash = 0;
     for (int i = 0; i < num_data; ++i) {
       TupleRow* row = data[i].probe_row;
 
       HashTable::Iterator iter;
-      if (ht_ctx->EvalAndHashProbe(row, &hash)) continue;
-      iter = table->FindProbeRow(ht_ctx, hash);
+      if (ht_ctx->EvalAndHashProbe(row)) continue;
+      iter = table->FindProbeRow(ht_ctx);
 
       if (data[i].expected_build_rows.size() == 0) {
         EXPECT_TRUE(iter.AtEnd());
@@ -183,7 +182,6 @@ class HashTableTest : public testing::Test {
       int max_num_blocks = 100, int reserved_blocks = 10) {
     EXPECT_TRUE(test_env_->CreateQueryState(0, max_num_blocks, block_size,
         &runtime_state_).ok());
-
     BufferedBlockMgr::Client* client;
     EXPECT_TRUE(runtime_state_->block_mgr()->RegisterClient("", reserved_blocks, false,
         &tracker_, runtime_state_, &client).ok());
@@ -236,19 +234,21 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, true /* stores_nulls_ */,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, true /* stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
     for (int i = 0; i < 2; ++i) {
-      uint32_t hash = 0;
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
     hash_table->Close();
+    ht_ctx->Close();
   }
 
   // This test inserts the build rows [0->5) to hash table. It validates that they
@@ -269,51 +269,52 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
-    uint32_t hash = 0;
-    bool success = hash_table->CheckAndResize(5, &ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
+    bool success = hash_table->CheckAndResize(5, ht_ctx.get());
     ASSERT_TRUE(success);
     for (int i = 0; i < 5; ++i) {
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->size(), 5);
 
     // Do a full table scan and validate returned pointers
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Double the size of the hash table and scan again.
-    ResizeTable(hash_table.get(), 2048, &ht_ctx);
+    ResizeTable(hash_table.get(), 2048, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 2048);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Try to shrink and scan again.
-    ResizeTable(hash_table.get(), 64, &ht_ctx);
+    ResizeTable(hash_table.get(), 64, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 64);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Resize to 8, which is the smallest value to fit the number of filled buckets.
-    ResizeTable(hash_table.get(), 8, &ht_ctx);
+    ResizeTable(hash_table.get(), 8, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 8);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   void ScanTest(bool quadratic, int initial_size, int rows_to_insert,
@@ -322,24 +323,26 @@ class HashTableTest : public testing::Test {
     ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
 
     int total_rows = rows_to_insert + additional_rows;
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Add 1 row with val 1, 2 with val 2, etc.
     vector<TupleRow*> build_rows;
     ProbeTestData* probe_rows = new ProbeTestData[total_rows];
     probe_rows[0].probe_row = CreateTupleRow(0);
-    uint32_t hash = 0;
     for (int val = 1; val <= rows_to_insert; ++val) {
-      bool success = hash_table->CheckAndResize(val, &ht_ctx);
+      bool success = hash_table->CheckAndResize(val, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << val;
       probe_rows[val].probe_row = CreateTupleRow(val);
       for (int i = 0; i < val; ++i) {
         TupleRow* row = CreateTupleRow(val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         build_rows.push_back(row);
         probe_rows[val].expected_build_rows.push_back(row);
       }
@@ -351,22 +354,22 @@ class HashTableTest : public testing::Test {
     }
 
     // Test that all the builds were found.
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     // Resize and try again.
     int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     delete [] probe_rows;
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test continues adding tuples to the hash table and exercises the resize code
@@ -378,27 +381,29 @@ class HashTableTest : public testing::Test {
     MemTracker tracker(100 * 1024 * 1024);
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20)
     // entries. When num_to_add == 4, then the total number of inserts is 4194300.
     int build_row_val = 0;
-    uint32_t hash = 0;
     for (int i = 0; i < 20; ++i) {
       // Currently the mem used for the bucket is not being tracked by the mem tracker.
       // Thus the resize is expected to be successful.
       // TODO: Keep track of the mem used for the buckets and test cases where we actually
       // hit OOM.
       // TODO: Insert duplicates to also hit OOM.
-      bool success = hash_table->CheckAndResize(num_to_add, &ht_ctx);
+      bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << num_to_add;
       for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
         TupleRow* row = CreateTupleRow(build_row_val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         if (!inserted) goto done_inserting;
       }
       expected_size += num_to_add;
@@ -410,8 +415,8 @@ class HashTableTest : public testing::Test {
     // Validate that we can find the entries before we went over the limit
     for (int i = 0; i < expected_size * 5; i += 100000) {
       TupleRow* probe_row = CreateTupleRow(i);
-      if (!ht_ctx.EvalAndHashProbe(probe_row, &hash)) continue;
-      HashTable::Iterator iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      if (!ht_ctx->EvalAndHashProbe(probe_row)) continue;
+      HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
       if (i < hash_table->size()) {
         EXPECT_TRUE(!iter.AtEnd()) << " i: " << i;
         ValidateMatch(probe_row, iter.GetRow());
@@ -420,7 +425,7 @@ class HashTableTest : public testing::Test {
       }
     }
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test inserts and probes as many elements as the size of the hash table without
@@ -430,9 +435,12 @@ class HashTableTest : public testing::Test {
   void InsertFullTest(bool quadratic, int table_size) {
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
     EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Insert and probe table_size different tuples. All of them are expected to be
     // successfully inserted and probed.
@@ -441,30 +449,32 @@ class HashTableTest : public testing::Test {
     bool found;
     for (int build_row_val = 0; build_row_val < table_size; ++build_row_val) {
       TupleRow* row = CreateTupleRow(build_row_val);
-      bool passes = ht_ctx.EvalAndHashBuild(row, &hash);
+      bool passes = ht_ctx->EvalAndHashBuild(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
 
       // Insert using both Insert() and FindBucket() methods.
       if (build_row_val % 2 == 0) {
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         EXPECT_TRUE(inserted);
       } else {
-        iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+        iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
         EXPECT_FALSE(iter.AtEnd());
         EXPECT_FALSE(found);
         iter.SetTuple(row->GetTuple(0), hash);
       }
       EXPECT_EQ(hash_table->EmptyBuckets(), table_size - build_row_val - 1);
 
-      passes = ht_ctx.EvalAndHashProbe(row, &hash);
+      passes = ht_ctx->EvalAndHashProbe(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
-      iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      iter = hash_table->FindProbeRow(ht_ctx.get());
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
 
-      iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+      iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_TRUE(found);
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
@@ -474,18 +484,18 @@ class HashTableTest : public testing::Test {
     // hash table code path.
     EXPECT_EQ(hash_table->EmptyBuckets(), 0);
     TupleRow* probe_row = CreateTupleRow(table_size);
-    bool passes = ht_ctx.EvalAndHashProbe(probe_row, &hash);
+    bool passes = ht_ctx->EvalAndHashProbe(probe_row);
     EXPECT_TRUE(passes);
-    iter = hash_table->FindProbeRow(&ht_ctx, hash);
+    iter = hash_table->FindProbeRow(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
 
     // Since hash_table is full, FindBucket cannot find an empty bucket, so returns End().
-    iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+    iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
     EXPECT_TRUE(iter.AtEnd());
     EXPECT_FALSE(found);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test makes sure we can tolerate the low memory case where we do not have enough
@@ -498,12 +508,15 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTable> hash_table;
     ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size,
           max_num_blocks, reserved_blocks));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-    HashTable::Iterator iter = hash_table->Begin(&ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
+    HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
-
     hash_table->Close();
+    ht_ctx->Close();
   }
 };
 
@@ -582,17 +595,23 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-      std::vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1);
+  EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024,
+      &runtime_state_).ok());
+  scoped_ptr<HashTableCtx> ht_ctx;
+  Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+      probe_expr_ctxs_, false /* !stores_nulls_ */,
+      vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1, &tracker_, &ht_ctx);
+  EXPECT_OK(status);
+
   uint32_t seed = 9999;
-  ht_ctx.set_level(0);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
+  ht_ctx->set_level(0);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
   // TODO: level 0 uses CRC hash, which only swaps bytes around on empty input.
-  // EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.set_level(1);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.Close();
+  // EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx->set_level(1);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx.get()->Close();
 }
 
 TEST_F(HashTableTest, VeryLowMemTest) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 9cab765..953ddce 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -84,8 +84,8 @@ static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof
 
 HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
     const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
-    const std::vector<bool>& finds_nulls, int32_t initial_seed,
-    int max_levels, int num_build_tuples)
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    MemTracker* tracker)
     : build_expr_ctxs_(build_expr_ctxs),
       probe_expr_ctxs_(probe_expr_ctxs),
       stores_nulls_(stores_nulls),
@@ -93,17 +93,13 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
       finds_some_nulls_(std::accumulate(
           finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())),
       level_(0),
-      scratch_row_(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) * num_build_tuples))) {
+      scratch_row_(NULL),
+      tracker_(tracker) {
   DCHECK(!finds_some_nulls_ || stores_nulls_);
   // Compute the layout and buffer size to store the evaluated expr results
   DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size());
   DCHECK_EQ(build_expr_ctxs_.size(), finds_nulls_.size());
   DCHECK(!build_expr_ctxs_.empty());
-  results_buffer_size_ = Expr::ComputeResultsLayout(build_expr_ctxs_,
-      &expr_values_buffer_offsets_, &var_result_begin_);
-  expr_values_buffer_ = new uint8_t[results_buffer_size_];
-  memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_);
-  expr_value_null_bits_ = new uint8_t[build_expr_ctxs.size()];
 
   // Populate the seeds to use for all the levels. TODO: revisit how we generate these.
   DCHECK_GE(max_levels, 0);
@@ -116,32 +112,65 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
   }
 }
 
+Status HashTableCtx::Create(RuntimeState* state,
+    const std::vector<ExprContext*>& build_expr_ctxs,
+    const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    int num_build_tuples, MemTracker* tracker, scoped_ptr<HashTableCtx>* ht_ctx) {
+  ht_ctx->reset(new HashTableCtx(build_expr_ctxs, probe_expr_ctxs, stores_nulls,
+      finds_nulls, initial_seed, max_levels, tracker));
+  return ht_ctx->get()->Init(state, num_build_tuples);
+}
+
+Status HashTableCtx::Init(RuntimeState* state, int num_build_tuples) {
+  int scratch_row_size = sizeof(Tuple*) * num_build_tuples;
+  scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size));
+  if (UNLIKELY(scratch_row_ == NULL)) {
+    return Status(Substitute("Failed to allocate $0 bytes for scratch row of "
+        "HashTableCtx.", scratch_row_size));
+  }
+  return expr_values_cache_.Init(state, tracker_, build_expr_ctxs_);
+}
+
 void HashTableCtx::Close() {
-  // TODO: use tr1::array?
-  DCHECK(expr_values_buffer_ != NULL);
-  delete[] expr_values_buffer_;
-  expr_values_buffer_ = NULL;
-  DCHECK(expr_value_null_bits_ != NULL);
-  delete[] expr_value_null_bits_;
-  expr_value_null_bits_ = NULL;
   free(scratch_row_);
   scratch_row_ = NULL;
+  expr_values_cache_.Close(tracker_);
+}
+
+uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
+  /// Use CRC hash at first level for better performance. Switch to murmur hash at
+  /// subsequent levels since CRC doesn't randomize well with different seed inputs.
+  if (level_ == 0) return HashUtil::Hash(input, len, hash);
+  return HashUtil::MurmurHash2_64(input, len, hash);
+}
+
+uint32_t HashTableCtx::HashCurrentRow() const {
+  DCHECK_LT(level_, seeds_.size());
+  if (expr_values_cache_.var_result_offset() == -1) {
+    /// This handles NULLs implicitly since a constant seed value was put
+    /// into results buffer for nulls.
+    return Hash(expr_values_cache_.cur_expr_values_,
+        expr_values_cache_.expr_values_bytes_per_row(), seeds_[level_]);
+  } else {
+    return HashTableCtx::HashVariableLenRow();
+  }
 }
 
 bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
   bool has_null = false;
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < ctxs.size(); ++i) {
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     void* val = ctxs[i]->GetValue(row);
     if (val == NULL) {
       // If the table doesn't store nulls, no reason to keep evaluating
       if (!stores_nulls_) return true;
-
-      expr_value_null_bits_[i] = true;
+      exprs_nullness[i] = true;
       val = reinterpret_cast<void*>(&NULL_VALUE);
       has_null = true;
     } else {
-      expr_value_null_bits_[i] = false;
+      exprs_nullness[i] = false;
     }
     DCHECK_LE(build_expr_ctxs_[i]->root()->type().GetSlotSize(),
         sizeof(NULL_VALUE));
@@ -152,18 +181,20 @@ bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
 
 uint32_t HashTableCtx::HashVariableLenRow() const {
   uint32_t hash = seeds_[level_];
+  int var_result_offset = expr_values_cache_.var_result_offset();
   // Hash the non-var length portions (if there are any)
-  if (var_result_begin_ != 0) {
-    hash = Hash(expr_values_buffer_, var_result_begin_, hash);
+  if (var_result_offset != 0) {
+    hash = Hash(expr_values_cache_.cur_expr_values_, var_result_offset, hash);
   }
 
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
-    // non-string and null slots are already part of expr_values_buffer
+    // non-string and null slots are already part of cur_expr_values_
     if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING &&
         build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue;
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    if (expr_value_null_bits_[i]) {
+    void* loc = expr_values_cache_.ExprValuePtr(i);
+    if (exprs_nullness[i]) {
       // Hash the null random seed values at 'loc'
       hash = Hash(loc, sizeof(StringValue), hash);
     } else {
@@ -178,17 +209,18 @@ uint32_t HashTableCtx::HashVariableLenRow() const {
 
 template<bool FORCE_NULL_EQUALITY>
 bool HashTableCtx::Equals(TupleRow* build_row) const {
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     void* val = build_expr_ctxs_[i]->GetValue(build_row);
     if (val == NULL) {
       if (!(FORCE_NULL_EQUALITY || finds_nulls_[i])) return false;
-      if (!expr_value_null_bits_[i]) return false;
+      if (!exprs_nullness[i]) return false;
       continue;
     } else {
-      if (expr_value_null_bits_[i]) return false;
+      if (exprs_nullness[i]) return false;
     }
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     if (!RawValue::Eq(loc, val, build_expr_ctxs_[i]->root()->type())) {
       return false;
     }
@@ -199,6 +231,113 @@ bool HashTableCtx::Equals(TupleRow* build_row) const {
 template bool HashTableCtx::Equals<true>(TupleRow* build_row) const;
 template bool HashTableCtx::Equals<false>(TupleRow* build_row) const;
 
+HashTableCtx::ExprValuesCache::ExprValuesCache()
+    : capacity_(0),
+      cur_expr_values_(NULL),
+      cur_expr_values_null_(NULL),
+      cur_expr_values_hash_(NULL),
+      cur_expr_values_hash_end_(NULL),
+      expr_values_array_(NULL),
+      expr_values_null_array_(NULL),
+      expr_values_hash_array_(NULL),
+      null_bitmap_(0) { }
+
+Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state,
+    MemTracker* tracker, const std::vector<ExprContext*>& build_expr_ctxs) {
+  // Initialize the number of expressions.
+  num_exprs_ = build_expr_ctxs.size();
+  // Compute the layout of evaluated values of a row.
+  expr_values_bytes_per_row_ = Expr::ComputeResultsLayout(build_expr_ctxs,
+      &expr_values_offsets_, &var_result_offset_);
+  if (expr_values_bytes_per_row_ == 0) {
+    DCHECK_EQ(num_exprs_, 0);
+    return Status::OK();
+  }
+  DCHECK_GT(expr_values_bytes_per_row_, 0);
+  // Compute the maximum number of cached rows which can fit in the memory budget.
+  // TODO: Find the optimal prefetch batch size. This may be something
+  // processor dependent so we may need calibration at Impala startup time.
+  capacity_ = std::max(1, std::min(state->batch_size(),
+      MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
+
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  if (!tracker->TryConsume(mem_usage)) {
+    capacity_ = 0;
+    string details = Substitute("HashTableCtx::ExprValuesCache failed to allocate $0 bytes.",
+        mem_usage);
+    return tracker->MemLimitExceeded(state, details, mem_usage);
+  }
+
+  int expr_values_size = expr_values_bytes_per_row_ * capacity_;
+  expr_values_array_.reset(new uint8_t[expr_values_size]);
+  cur_expr_values_ = expr_values_array_.get();
+  memset(cur_expr_values_, 0, expr_values_size);
+
+  int expr_values_null_size = num_exprs_ * capacity_;
+  expr_values_null_array_.reset(new uint8_t[expr_values_null_size]);
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  memset(cur_expr_values_null_, 0, expr_values_null_size);
+
+  expr_values_hash_array_.reset(new uint32_t[capacity_]);
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  memset(cur_expr_values_hash_, 0, sizeof(uint32) * capacity_);
+
+  null_bitmap_.Reset(capacity_);
+  return Status::OK();
+}
+
+void HashTableCtx::ExprValuesCache::Close(MemTracker* tracker) {
+  if (capacity_ == 0) return;
+  cur_expr_values_ = NULL;
+  cur_expr_values_null_ = NULL;
+  cur_expr_values_hash_ = NULL;
+  cur_expr_values_hash_end_ = NULL;
+  expr_values_array_.reset();
+  expr_values_null_array_.reset();
+  expr_values_hash_array_.reset();
+  null_bitmap_.Reset(0);
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  tracker->Release(mem_usage);
+}
+
+int HashTableCtx::ExprValuesCache::MemUsage(int capacity,
+    int expr_values_bytes_per_row, int num_exprs) {
+  return expr_values_bytes_per_row * capacity + // expr_values_array_
+      num_exprs * capacity +                    // expr_values_null_array_
+      sizeof(uint32) * capacity +               // expr_values_hash_array_
+      Bitmap::MemUsage(capacity);               // null_bitmap_
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValuePtr(int expr_idx) const {
+  return cur_expr_values_ + expr_values_offsets_[expr_idx];
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValueNullPtr(int expr_idx) const {
+  return cur_expr_values_null_ + expr_idx;
+}
+
+void HashTableCtx::ExprValuesCache::ResetIterators() {
+  cur_expr_values_ = expr_values_array_.get();
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+}
+
+void HashTableCtx::ExprValuesCache::Reset() {
+  ResetIterators();
+  // Set the end pointer after resetting the other pointers so they point to
+  // the same location.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  null_bitmap_.SetAllBits(false);
+}
+
+void HashTableCtx::ExprValuesCache::ResetForRead() {
+  // Record the end of hash values iterator to be used in AtEnd().
+  // Do it before resetting the pointers.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  ResetIterators();
+}
+
 const double HashTable::MAX_FILL_FACTOR = 0.75f;
 
 HashTable* HashTable::Create(RuntimeState* state,
@@ -306,7 +445,7 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
     Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_];
     bool found = false;
     int64_t bucket_idx =
-        Probe<true>(new_buckets, num_buckets, false, NULL, NULL, bucket_to_copy->hash, &found);
+        Probe<true>(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found);
     DCHECK(!found);
     DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though "
         " there are free buckets. " << num_buckets << " " << num_filled_buckets_;
@@ -458,30 +597,71 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 }
 
 // Codegen for evaluating a tuple row over either build_expr_ctxs_ or probe_expr_ctxs_.
-// For the case where we are joining on a single int, the IR looks like
-// define i1 @EvalBuildRow(%"class.impala::HashTableCtx"* %this_ptr,
-//                         %"class.impala::TupleRow"* %row) #20 {
+// For a group by with (big int, string) the IR looks like
+// define i1 @EvalProbeRow(%"class.impala::HashTableCtx"* %this_ptr,
+//                         %"class.impala::TupleRow"* %row) #33 {
 // entry:
-//   %result = call i64 @GetSlotRef1(%"class.impala::ExprContext"* inttoptr
-//                                     (i64 67971664 to %"class.impala::ExprContext"*),
-//                                   %"class.impala::TupleRow"* %row)
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %loc_addr = getelementptr i8, i8* %0, i32 0
+//   %loc = bitcast i8* %loc_addr to i32*
+//   %result = call i64 @GetSlotRef.3(%"class.impala::ExprContext"*
+//             inttoptr (i64 158123712 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
 //   %is_null = trunc i64 %result to i1
-//   %0 = zext i1 %is_null to i8
-//   store i8 %0, i8* inttoptr (i64 95753144 to i8*)
+//   %2 = zext i1 %is_null to i8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 0
+//   store i8 %2, i8* %null_byte_loc
 //   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   store i32 -2128831035, i32* inttoptr (i64 95753128 to i32*)
+//   store i32 -2128831035, i32* %loc
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %1 = ashr i64 %result, 32
-//   %2 = trunc i64 %1 to i32
-//   store i32 %2, i32* inttoptr (i64 95753128 to i32*)
+//   %3 = ashr i64 %result, 32
+//   %4 = trunc i64 %3 to i32
+//   store i32 %4, i32* %loc
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   ret i1 true
+//   %is_null_phi = phi i1 [ true, %null ], [ false, %not_null ]
+//   %has_null = or i1 false, %is_null_phi
+//   %loc_addr1 = getelementptr i8, i8* %0, i32 8
+//   %loc2 = bitcast i8* %loc_addr1 to %"struct.impala::StringValue"*
+//   %result6 = call { i64, i8* } @GetSlotRef.4(%"class.impala::ExprContext"*
+//              inttoptr (i64 158123904 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result6, 0
+//   %is_null7 = trunc i64 %5 to i1
+//   %6 = zext i1 %is_null7 to i8
+//   %null_byte_loc8 = getelementptr i8, i8* %1, i32 1
+//   store i8 %6, i8* %null_byte_loc8
+//   br i1 %is_null7, label %null3, label %not_null4
+//
+// null3:                                            ; preds = %continue
+//   %string_ptr = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 0
+//   %string_len = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 1
+//   store i8* inttoptr (i32 -2128831035 to i8*), i8** %string_ptr
+//   store i32 -2128831035, i32* %string_len
+//   br label %continue5
+//
+// not_null4:                                        ; preds = %continue
+//   %result9 = extractvalue { i64, i8* } %result6, 1
+//   %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %result9, 0
+//   %8 = extractvalue { i64, i8* } %result6, 0
+//   %9 = ashr i64 %8, 32
+//   %10 = trunc i64 %9 to i32
+//   %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
+//   store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %loc2
+//   br label %continue5
+//
+// continue5:                                        ; preds = %not_null4, %null3
+//   %is_null_phi10 = phi i1 [ true, %null3 ], [ false, %not_null4 ]
+//   %has_null11 = or i1 %has_null, %is_null_phi10
+//   ret i1 %has_null11
 // }
 // For each expr, we create 3 code blocks.  The null, not null and continue blocks.
 // Both the null and not null branch into the continue block.  The continue block
@@ -509,7 +689,7 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
       codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
@@ -519,18 +699,29 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
   LlvmCodeGen::LlvmBuilder builder(context);
   Value* args[2];
   *fn = prototype.GeneratePrototype(&builder, args);
-
   Value* row = args[1];
   Value* has_null = codegen->false_value();
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   for (int i = 0; i < ctxs.size(); ++i) {
     // TODO: refactor this to somewhere else?  This is not hash table specific except for
     // the null handling bit and would be used for anyone that needs to materialize a
     // vector of exprs
     // Convert result buffer to llvm ptr type
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* llvm_loc = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(ctxs[i]->root()->type()), loc);
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
+    Value* llvm_loc = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(ctxs[i]->root()->type()), "loc");
 
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
     BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", *fn);
@@ -555,11 +746,9 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 
     // Set null-byte result
     Value* null_byte = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT));
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-    Value* llvm_null_byte_loc =
-        codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+    Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+        codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
     builder.CreateStore(null_byte, llvm_null_byte_loc);
-
     builder.CreateCondBr(is_null, null_block, not_null_block);
 
     // Null block
@@ -599,30 +788,37 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 
 // Codegen for hashing the current row.  In the case with both string and non-string data
 // (group by int_col, string_col), the IR looks like:
-// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #20 {
+// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #33 {
 // entry:
-//   %seed = call i32 @GetHashSeed(%"class.impala::HashTableCtx"* %this_ptr)
-//   %0 = call i32 @CrcHash16(i8* inttoptr (i64 119151296 to i8*), i32 16, i32 %seed)
-//   %1 = load i8* inttoptr (i64 119943721 to i8*)
-//   %2 = icmp ne i8 %1, 0
-//   br i1 %2, label %null, label %not_null
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %seed = call i32 @_ZNK6impala12HashTableCtx11GetHashSeedEv(
+//           %"class.impala::HashTableCtx"* %this_ptr)
+//   %hash = call i32 @CrcHash8(i8* %0, i32 8, i32 %seed)
+//   %loc_addr = getelementptr i8, i8* %0, i32 8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 1
+//   %null_byte = load i8, i8* %null_byte_loc
+//   %is_null = icmp ne i8 %null_byte, 0
+//   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   %3 = call i32 @CrcHash161(i8* inttoptr (i64 119151312 to i8*), i32 16, i32 %0)
+//   %str_null = call i32 @CrcHash16(i8* %loc_addr, i32 16, i32 %hash)
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %4 = load i8** getelementptr inbounds (%"struct.impala::StringValue"* inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 0)
-//   %5 = load i32* getelementptr inbounds (%"struct.impala::StringValue"* inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 1)
-//   %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0)
+//   %str_val = bitcast i8* %loc_addr to %"struct.impala::StringValue"*
+//   %2 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 0
+//   %3 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 1
+//   %ptr = load i8*, i8** %2
+//   %len = load i32, i32* %3
+//   %string_hash = call i32 @IrCrcHash(i8* %ptr, i32 %len, i32 %hash)
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   %7 = phi i32 [ %6, %not_null ], [ %3, %null ]
-//   call void @set_hash(%"class.impala::HashTableCtx"* %this_ptr, i32 %7)
-//   ret i32 %7
+//   %hash_phi = phi i32 [ %string_hash, %not_null ], [ %str_null, %null ]
+//   ret i32 %hash_phi
 // }
 Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
     Function** fn) {
@@ -640,6 +836,7 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
 
   LlvmCodeGen::FnPrototype prototype(codegen,
       (use_murmur ? "MurmurHashCurrentRow" : "HashCurrentRow"),
@@ -651,6 +848,16 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   Value* this_arg;
   *fn = prototype.GeneratePrototype(&builder, &this_arg);
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   // Call GetHashSeed() to get seeds_[level_]
   Function* get_hash_seed_fn =
       codegen->GetFunction(IRFunction::HASH_TABLE_GET_HASH_SEED, false);
@@ -658,25 +865,26 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       "seed");
 
   Value* hash_result = seed;
-  Value* data = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), expr_values_buffer_);
-  if (var_result_begin_ == -1) {
-    // No variable length slots, just hash what is in 'expr_values_buffer_'
-    if (results_buffer_size_ > 0) {
+  const int var_result_offset = expr_values_cache_.var_result_offset();
+  const int expr_values_bytes_per_row = expr_values_cache_.expr_values_bytes_per_row();
+  if (var_result_offset == -1) {
+    // No variable length slots, just hash what is in 'expr_expr_values_cache_'
+    if (expr_values_bytes_per_row > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(results_buffer_size_) :
-                          codegen->GetHashFunction(results_buffer_size_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_);
+                          codegen->GetMurmurHashFunction(expr_values_bytes_per_row) :
+                          codegen->GetHashFunction(expr_values_bytes_per_row);
+      Value* len = codegen->GetIntConstant(TYPE_INT, expr_values_bytes_per_row);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
   } else {
-    if (var_result_begin_ > 0) {
+    if (var_result_offset > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(var_result_begin_) :
-                          codegen->GetHashFunction(var_result_begin_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_);
+                          codegen->GetMurmurHashFunction(var_result_offset) :
+                          codegen->GetHashFunction(var_result_offset);
+      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_offset);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
 
     // Hash string slots
@@ -689,7 +897,9 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       BasicBlock* continue_block = NULL;
       Value* str_null_result = NULL;
 
-      void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+      int offset = expr_values_cache_.expr_values_offsets(i);
+      Value* llvm_loc = builder.CreateGEP(NULL, cur_expr_values,
+          codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
 
       // If the hash table stores nulls, we need to check if the stringval
       // evaluated to NULL
@@ -698,9 +908,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
         not_null_block = BasicBlock::Create(context, "not_null", *fn);
         continue_block = BasicBlock::Create(context, "continue", *fn);
 
-        uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-        Value* llvm_null_byte_loc =
-            codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+        Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+            codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
         Value* null_byte = builder.CreateLoad(llvm_null_byte_loc, "null_byte");
         Value* is_null = builder.CreateICmpNE(null_byte,
             codegen->GetIntConstant(TYPE_TINYINT, 0), "is_null");
@@ -712,7 +921,6 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
         Function* null_hash_fn = use_murmur ?
                                  codegen->GetMurmurHashFunction(sizeof(StringValue)) :
                                  codegen->GetHashFunction(sizeof(StringValue));
-        Value* llvm_loc = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), loc);
         Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
         str_null_result = builder.CreateCall(null_hash_fn,
             ArrayRef<Value*>({llvm_loc, len, hash_result}), "str_null");
@@ -722,7 +930,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       }
 
       // Convert expr_values_buffer_ loc to llvm value
-      Value* str_val = codegen->CastPtrToLlvmPtr(codegen->GetPtrType(TYPE_STRING), loc);
+      Value* str_val = builder.CreatePointerCast(llvm_loc,
+          codegen->GetPtrType(TYPE_STRING), "str_val");
 
       Value* ptr = builder.CreateStructGEP(NULL, str_val, 0);
       Value* len = builder.CreateStructGEP(NULL, str_val, 1);
@@ -759,55 +968,71 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   return Status::OK();
 }
 
-// Codegen for HashTableCtx::Equals.  For a hash table with two exprs (string,int),
+// Codegen for HashTableCtx::Equals.  For a group by with (bigint, string),
 // the IR looks like:
 //
 // define i1 @Equals(%"class.impala::HashTableCtx"* %this_ptr,
-//                   %"class.impala::TupleRow"* %row) {
+//                   %"class.impala::TupleRow"* %row) #33 {
 // entry:
+//   %0 = alloca { i64, i8* }
+//   %1 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %2 = load i8*, i8** inttoptr (i64 230325064 to i8**)
 //   %result = call i64 @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//                                  (i64 146381856 to %"class.impala::ExprContext"*),
-//                                  %"class.impala::TupleRow"* %row)
-//   %0 = trunc i64 %result to i1
-//   br i1 %0, label %null, label %not_null
+//             (i64 165557504 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
+//   %is_null = trunc i64 %result to i1
+//   %null_byte_loc = getelementptr i8, i8* %2, i32 0
+//   %3 = load i8, i8* %null_byte_loc
+//   %4 = icmp ne i8 %3, 0
+//   %loc = getelementptr i8, i8* %1, i32 0
+//   %row_val = bitcast i8* %loc to i32*
+//   br i1 %is_null, label %null, label %not_null
 //
-// false_block:                            ; preds = %not_null2, %null1, %not_null, %null
+// false_block:                ; preds = %cmp9, %not_null2, %null1, %cmp, %not_null, %null
 //   ret i1 false
 //
 // null:                                             ; preds = %entry
-//   br i1 false, label %continue, label %false_block
+//   br i1 %4, label %continue, label %false_block
 //
 // not_null:                                         ; preds = %entry
-//   %1 = load i32* inttoptr (i64 104774368 to i32*)
-//   %2 = ashr i64 %result, 32
-//   %3 = trunc i64 %2 to i32
-//   %cmp_raw = icmp eq i32 %3, %1
-//   br i1 %cmp_raw, label %continue, label %false_block
+//   br i1 %4, label %false_block, label %cmp
 //
-// continue:                                         ; preds = %not_null, %null
-//   %result4 = call { i64, i8* } @GetSlotRef1(
-//       %"class.impala::ExprContext"* inttoptr
-//       (i64 146381696 to %"class.impala::ExprContext"*),
-//       %"class.impala::TupleRow"* %row)
-//   %4 = extractvalue { i64, i8* } %result4, 0
-//   %5 = trunc i64 %4 to i1
-//   br i1 %5, label %null1, label %not_null2
+// continue:                                         ; preds = %cmp, %null
+//   %result4 = call { i64, i8* } @GetSlotRef.2(%"class.impala::ExprContext"*
+//              inttoptr (i64 165557696 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result4, 0
+//   %is_null5 = trunc i64 %5 to i1
+//   %null_byte_loc6 = getelementptr i8, i8* %2, i32 1
+//   %6 = load i8, i8* %null_byte_loc6
+//   %7 = icmp ne i8 %6, 0
+//   %loc7 = getelementptr i8, i8* %1, i32 8
+//   %row_val8 = bitcast i8* %loc7 to %"struct.impala::StringValue"*
+//   br i1 %is_null5, label %null1, label %not_null2
+//
+// cmp:                                              ; preds = %not_null
+//   %8 = load i32, i32* %row_val
+//   %9 = ashr i64 %result, 32
+//   %10 = trunc i64 %9 to i32
+//   %cmp_raw = icmp eq i32 %10, %8
+//   br i1 %cmp_raw, label %continue, label %false_block
 //
 // null1:                                            ; preds = %continue
-//   br i1 false, label %continue3, label %false_block
+//   br i1 %7, label %continue3, label %false_block
 //
 // not_null2:                                        ; preds = %continue
-//   %6 = extractvalue { i64, i8* } %result4, 0
-//   %7 = ashr i64 %6, 32
-//   %8 = trunc i64 %7 to i32
-//   %result5 = extractvalue { i64, i8* } %result4, 1
-//   %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE(
-//       i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr
-//       (i64 104774384 to %"struct.impala::StringValue"*))
-//   br i1 %cmp_raw6, label %continue3, label %false_block
+//   br i1 %7, label %false_block, label %cmp9
 //
-// continue3:                                        ; preds = %not_null2, %null1
+// continue3:                                        ; preds = %cmp9, %null1
 //   ret i1 true
+//
+// cmp9:                                             ; preds = %not_null2
+//   store { i64, i8* } %result4, { i64, i8* }* %0
+//   %11 = bitcast { i64, i8* }* %0 to %"struct.impala_udf::StringVal"*
+//   %cmp_raw10 = call i1 @_Z13StringValueEqRKN10impala_udf9StringValERKN6
+//                impala11StringValueE(%"struct.impala_udf::StringVal"* %11,
+//                %"struct.impala::StringValue"* %row_val8)
+//   br i1 %cmp_raw10, label %continue3, label %false_block
 // }
 Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality,
     Function** fn) {
@@ -828,7 +1053,7 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
@@ -839,6 +1064,16 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   *fn = prototype.GeneratePrototype(&builder, args);
   Value* row = args[1];
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
@@ -862,25 +1097,26 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
         build_expr_ctxs_[i]->root()->type(), expr_fn, expr_fn_args, "result");
     Value* is_null = result.GetIsNull();
 
-    // Determine if row is null (i.e. expr_value_null_bits_[i] == true). In
+    // Determine if row is null (i.e. cur_expr_values_null_[i] == true). In
     // the case where the hash table does not store nulls, this is always false.
     Value* row_is_null = codegen->false_value();
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
 
     // We consider null values equal if we are comparing build rows or if the join
     // predicate is <=>
     if (force_null_equality || finds_nulls_[i]) {
-      Value* llvm_null_byte_loc =
-          codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+      Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+          codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
       Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
       row_is_null = builder.CreateICmpNE(null_byte,
           codegen->GetIntConstant(TYPE_TINYINT, 0));
     }
 
-    // Get llvm value for row_val from 'expr_values_buffer_'
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* row_val = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), loc);
+    // Get llvm value for row_val from 'cur_expr_values_'
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc");
+    Value* row_val = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), "row_val");
 
     // Branch for GetValue() returning NULL
     builder.CreateCondBr(is_null, null_block, not_null_block);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 673822e..6fc4169 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -111,10 +111,12 @@ class HashTableCtx {
   ///  - probe_exprs are used during FindProbeRow()
   ///  - stores_nulls: if false, TupleRows with nulls are ignored during Insert
   ///  - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() for
-  ///      TupleRows with nulls in position i even if stores_nulls is true.
-  ///  - initial_seed: Initial seed value to use when computing hashes for rows with
+  ///        TupleRows with nulls in position i even if stores_nulls is true.
+  ///  - initial_seed: initial seed value to use when computing hashes for rows with
   ///    level 0. Other levels have their seeds derived from this seed.
-  ///  - The max levels we will hash with.
+  ///  - max_levels: the max levels we will hash with.
+  ///  - tracker: the memory tracker of the exec node which owns this hash table context.
+  ///        Memory usage of expression values cache is charged against it.
   /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined
   ///       with '<=>' and others with '=', stores_nulls could distinguish between columns
   ///       in which nulls are stored and columns in which they are not, which could save
@@ -122,7 +124,18 @@ class HashTableCtx {
   HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
       const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
-      int num_build_tuples);
+      MemTracker* tracker);
+
+  /// Create a hash table context with the specified parameters, invoke Init() to
+  /// initialize the new hash table context and return it in 'ht_ctx'. Please see header
+  /// comments of HashTableCtx constructor for details of the parameters.
+  /// 'num_build_tuples' is the number of tuples of a row in the build side, used for
+  /// computing the size of a scratch row.
+  static Status Create(RuntimeState* state,
+      const std::vector<ExprContext*>& build_expr_ctxs,
+      const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+      const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+      int num_build_tuples, MemTracker* tracker, boost::scoped_ptr<HashTableCtx>* ht_ctx);
 
   /// Call to cleanup any resources.
   void Close();
@@ -135,29 +148,28 @@ class HashTableCtx {
 
   TupleRow* ALWAYS_INLINE scratch_row() const { return scratch_row_; }
 
-  /// Returns the results of the exprs at 'expr_idx' evaluated over the last row
-  /// processed.
+  /// Returns the results of the expression at 'expr_idx' evaluated at the current row.
   /// This value is invalid if the expr evaluated to NULL.
   /// TODO: this is an awkward abstraction but aggregation node can take advantage of
   /// it and save some expr evaluation calls.
-  void* ALWAYS_INLINE last_expr_value(int expr_idx) const {
-    return expr_values_buffer_ + expr_values_buffer_offsets_[expr_idx];
+  void* ALWAYS_INLINE ExprValue(int expr_idx) const {
+    return expr_values_cache_.ExprValuePtr(expr_idx);
   }
 
-  /// Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
-  bool ALWAYS_INLINE last_expr_value_null(int expr_idx) const {
-    return expr_value_null_bits_[expr_idx];
+  /// Returns if the expression at 'expr_idx' is evaluated to NULL for the current row.
+  bool ALWAYS_INLINE ExprValueNull(int expr_idx) const {
+    return static_cast<bool>(*expr_values_cache_.ExprValueNullPtr(expr_idx));
   }
 
-  /// Evaluate and hash the build/probe row, returning in *hash. Returns false if this
-  /// row should be rejected (doesn't need to be processed further) because it
-  /// contains NULL.
-  /// These need to be inlined in the IR module so we can find and replace the calls to
-  /// EvalBuildRow()/EvalProbeRow().
-  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash);
-  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash);
-
-  int ALWAYS_INLINE results_buffer_size() const { return results_buffer_size_; }
+  /// Evaluate and hash the build/probe row, saving the evaluation to the current row of
+  /// the ExprValuesCache in this hash table context: the results are saved in
+  /// 'cur_expr_values_', the nullness of expressions values in 'cur_expr_values_null_',
+  /// and the hashed expression values in 'cur_expr_values_hash_'. Returns false if this
+  /// row should be rejected  (doesn't need to be processed further) because it contains
+  /// NULL. These need to be inlined in the IR module so we can find and replace the
+  /// calls to EvalBuildRow()/EvalProbeRow().
+  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row);
+  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row);
 
   /// Codegen for evaluating a tuple row.  Codegen'd function matches the signature
   /// for EvalBuildRow and EvalTupleRow.
@@ -165,13 +177,13 @@ class HashTableCtx {
   Status CodegenEvalRow(RuntimeState* state, bool build_row, llvm::Function** fn);
 
   /// Codegen for evaluating a TupleRow and comparing equality against
-  /// 'expr_values_buffer_'.  Function signature matches HashTable::Equals().
+  /// 'cur_expr_values_'.  Function signature matches HashTable::Equals().
   /// 'force_null_equality' is true if the generated equality function should treat
   /// all NULLs as equal. See the template parameter to HashTable::Equals().
   Status CodegenEquals(RuntimeState* state, bool force_null_equality,
       llvm::Function** fn);
 
-  /// Codegen for hashing the expr values in 'expr_values_buffer_'. Function prototype
+  /// Codegen for hashing the expr values in 'cur_expr_values_'. Function prototype
   /// matches HashCurrentRow identically. Unlike HashCurrentRow(), the returned function
   /// only uses a single hash function, rather than switching based on level_.
   /// If 'use_murmur' is true, murmur hash is used, otherwise CRC is used if the hardware
@@ -180,36 +192,198 @@ class HashTableCtx {
 
   static const char* LLVM_CLASS_NAME;
 
+  /// To enable prefetching, the hash table building and probing are pipelined by the
+  /// exec nodes. A set of rows in a row batch will be evaluated and hashed first and
+  /// the corresponding hash table buckets are prefetched before they are probed against
+  /// the hash table. ExprValuesCache is a container for caching the results of
+  /// expressions evaluations for the rows in a prefetch set to avoid re-evaluating the
+  /// rows again during probing. Expressions evaluation can be very expensive.
+  ///
+  /// The expression evaluation results are cached in the following data structures:
+  ///
+  /// - 'expr_values_array_' is an array caching the results of the rows
+  /// evaluated against either the build or probe expressions. 'cur_expr_values_'
+  /// is a pointer into this array.
+  /// - 'expr_values_null_array_' is an array caching the nullness of each evaluated
+  /// expression in each row. 'cur_expr_values_null_' is a pointer into this array.
+  /// - 'expr_values_hash_array_' is an array of cached hash values of the rows.
+  /// 'cur_expr_values_hash_' is a pointer into this array.
+  /// - 'null_bitmap_' is a bitmap which indicates rows evaluated to NULL.
+  ///
+  /// ExprValuesCache provides an iterator like interface for performing a write pass
+  /// followed by a read pass. We refrain from providing an interface for random accesses
+  /// as there isn't a use case for it now and we want to avoid expensive multiplication
+  /// as the buffer size of each row is not necessarily power of two:
+  /// - Reset(), ResetForRead(): reset the iterators before writing / reading cached values.
+  /// - NextRow(): moves the iterators to point to the next row of cached values.
+  /// - AtEnd(): returns true if all cached rows have been read. Valid in read mode only.
+  ///
+  /// Various metadata information such as layout of results buffer is also stored in
+  /// this class. Note that the result buffer doesn't store variable length data. It only
+  /// contains pointers to the variable length data (e.g. if an expression value is a
+  /// StringValue).
+  ///
+  class ExprValuesCache {
+   public:
+    ExprValuesCache();
+
+    /// Allocates memory and initializes various data structures. Return error status
+    /// if memory allocation leads to the memory limits of the exec node to be exceeded.
+    /// 'tracker' is the memory tracker of the exec node which owns this HashTableCtx.
+    Status Init(RuntimeState* state, MemTracker* tracker,
+        const std::vector<ExprContext*>& build_expr_ctxs);
+
+    /// Frees up various resources and updates memory tracker with proper accounting.
+    /// 'tracker' should be the same memory tracker which was passed in for Init().
+    void Close(MemTracker* tracker);
+
+    /// Resets the cache states (iterators, end pointers etc) before writing.
+    void Reset();
+
+    /// Resets the iterators to the start before reading. Will record the current position
+    /// of the iterators in end pointer before resetting so AtEnd() can determine if all
+    /// cached values have been read.
+    void ResetForRead();
+
+    /// Advances the iterators to the next row by moving to the next entries in the
+    /// arrays of cached values.
+    void ALWAYS_INLINE NextRow();
+
+    /// Compute the total memory usage of this ExprValuesCache.
+    static int MemUsage(int capacity, int results_buffer_size, int num_build_exprs);
+
+    /// Returns the maximum number rows of expression values states which can be cached.
+    int ALWAYS_INLINE capacity() const { return capacity_; }
+
+    /// Returns the total size in bytes of a row of evaluated expressions' values.
+    int ALWAYS_INLINE expr_values_bytes_per_row() const {
+      return expr_values_bytes_per_row_;
+    }
+
+    /// Returns the offset into the result buffer of the first variable length
+    /// data results.
+    int ALWAYS_INLINE var_result_offset() const { return var_result_offset_; }
+
+    /// Returns true if the current read pass is complete, meaning all cached values
+    /// have been read.
+    bool ALWAYS_INLINE AtEnd() const {
+      return cur_expr_values_hash_ == cur_expr_values_hash_end_;
+    }
+
+    /// Returns true if the current row is null but nulls are not considered in the current
+    /// phase (build or probe).
+    bool ALWAYS_INLINE IsRowNull() const { return null_bitmap_.Get<false>(CurIdx()); }
+
+    /// Record in a bitmap that the current row is null but nulls are not considered in
+    /// the current phase (build or probe).
+    void ALWAYS_INLINE SetRowNull() { null_bitmap_.Set<false>(CurIdx(), true); }
+
+    /// Returns the hash values of the current row.
+    uint32_t ALWAYS_INLINE ExprValuesHash() const { return *cur_expr_values_hash_; }
+
+    /// Sets the hash values for the current row.
+    void ALWAYS_INLINE SetExprValuesHash(uint32_t hash) { *cur_expr_values_hash_ = hash; }
+
+    /// Returns a pointer to the expression value at 'expr_idx' for the current row.
+    uint8_t* ExprValuePtr(int expr_idx) const;
+
+    /// Returns a pointer to the boolean indicating the nullness of the expression value
+    /// at 'expr_idx'.
+    uint8_t* ExprValueNullPtr(int expr_idx) const;
+
+    /// Returns the offset into the results buffer of the expression value at 'expr_idx'.
+    int ALWAYS_INLINE expr_values_offsets(int expr_idx) const {
+      return expr_values_offsets_[expr_idx];
+    }
+
+   private:
+    friend class HashTableCtx;
+
+    /// Resets the iterators to the beginning of the cache values' arrays.
+    void ResetIterators();
+
+    /// Returns the offset in number of rows into the cached values' buffer.
+    int ALWAYS_INLINE CurIdx() const {
+      return cur_expr_values_hash_ - expr_values_hash_array_.get();
+    }
+
+    /// Max amount of memory in bytes for caching evaluated expression values.
+    static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
+
+    /// Maximum number of rows of expressions evaluation states which this
+    /// ExprValuesCache can cache.
+    int capacity_;
+
+    /// Byte size of a row of evaluated expression values. Never changes once set,
+    /// can be used for constant substitution during codegen.
+    int expr_values_bytes_per_row_;
+
+    /// Number of build/probe expressions.
+    int num_exprs_;
+
+    /// Pointer into 'expr_values_array_' for the current row's expression values.
+    uint8_t* cur_expr_values_;
+
+    /// Pointer into 'expr_values_null_array_' for the current row's nullness of each
+    /// expression value.
+    uint8_t* cur_expr_values_null_;
+
+    /// Pointer into 'expr_hash_value_array_' for the hash value of current row's
+    /// expression values.
+    uint32_t* cur_expr_values_hash_;
+
+    /// Pointer to the buffer one beyond the end of the last entry of cached expressions'
+    /// hash values.
+    uint32_t* cur_expr_values_hash_end_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of evaluated expression
+    /// values. Each row consumes 'expr_values_bytes_per_row_' number of bytes.
+    boost::scoped_array<uint8_t> expr_values_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of null booleans.
+    /// Each row contains 'num_exprs_' booleans to indicate nullness of expression values.
+    /// Used when the hash table supports NULL. Use 'uint8_t' to guarantee each entry is 1
+    /// byte as sizeof(bool) is implementation dependent. The IR depends on this
+    /// assumption.
+    boost::scoped_array<uint8_t> expr_values_null_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of hashed values.
+    boost::scoped_array<uint32_t> expr_values_hash_array_;
+
+    /// One bit for each row. A bit is set if that row is not hashed as it's evaluated
+    /// to NULL but the hash table doesn't support NULL. Such rows may still be included
+    /// in outputs for certain join types (e.g. left anti joins).
+    Bitmap null_bitmap_;
+
+    /// Maps from expression index to the byte offset into a row of expression values.
+    /// One entry per build/probe expression.
+    std::vector<int> expr_values_offsets_;
+
+    /// Byte offset into 'cur_expr_values_' that begins the variable length results for
+    /// a row. If -1, there are no variable length slots. Never changes once set, can be
+    /// constant substituted with codegen.
+    int var_result_offset_;
+  };
+
+  ExprValuesCache* ALWAYS_INLINE expr_values_cache() { return &expr_values_cache_; }
+
  private:
   friend class HashTable;
   friend class HashTableTest_HashEmpty_Test;
 
+  /// Allocate various buffers for storing expression evaluation results, hash values,
+  /// null bits etc. Returns error if allocation causes query memory limit to be exceeded.
+  Status Init(RuntimeState* state, int num_build_tuples);
+
   /// Compute the hash of the values in expr_values_buffer_.
   /// This will be replaced by codegen.  We don't want this inlined for replacing
   /// with codegen'd functions so the function name does not change.
-  uint32_t IR_NO_INLINE HashCurrentRow() const {
-    DCHECK_LT(level_, seeds_.size());
-    if (var_result_begin_ == -1) {
-      /// This handles NULLs implicitly since a constant seed value was put
-      /// into results buffer for nulls.
-      /// TODO: figure out which hash function to use. We need to generate uncorrelated
-      /// hashes by changing just the seed. CRC does not have this property and FNV is
-      /// okay. We should switch to something else.
-      return Hash(expr_values_buffer_, results_buffer_size_, seeds_[level_]);
-    } else {
-      return HashTableCtx::HashVariableLenRow();
-    }
-  }
+  uint32_t IR_NO_INLINE HashCurrentRow() const;
 
   /// Wrapper function for calling correct HashUtil function in non-codegen'd case.
-  uint32_t inline Hash(const void* input, int len, uint32_t hash) const {
-    /// Use CRC hash at first level for better performance. Switch to murmur hash at
-    /// subsequent levels since CRC doesn't randomize well with different seed inputs.
-    if (level_ == 0) return HashUtil::Hash(input, len, hash);
-    return HashUtil::MurmurHash2_64(input, len, hash);
-  }
+  uint32_t Hash(const void* input, int len, uint32_t hash) const;
 
-  /// Evaluate 'row' over build exprs caching the results in 'expr_values_buffer_' This
+  /// Evaluate 'row' over build exprs caching the results in 'cur_expr_values_' This
   /// will be replaced by codegen.  We do not want this function inlined when cross
   /// compiled because we need to be able to differentiate between EvalBuildRow and
   /// EvalProbeRow by name and the build/probe exprs are baked into the codegen'd
@@ -218,7 +392,7 @@ class HashTableCtx {
     return EvalRow(row, build_expr_ctxs_);
   }
 
-  /// Evaluate 'row' over probe exprs caching the results in 'expr_values_buffer_'
+  /// Evaluate 'row' over probe exprs caching the results in 'cur_expr_values_'
   /// This will be replaced by codegen.
   bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
     return EvalRow(row, probe_expr_ctxs_);
@@ -228,15 +402,15 @@ class HashTableCtx {
   /// fields (e.g. strings).
   uint32_t HashVariableLenRow() const;
 
-  /// Evaluate the exprs over row and cache the results in 'expr_values_buffer_'.
+  /// Evaluate the exprs over row and cache the results in 'cur_expr_values_'.
   /// Returns whether any expr evaluated to NULL.
   /// This will be replaced by codegen.
   bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs);
 
   /// Returns true if the values of build_exprs evaluated over 'build_row' equal the
-  /// values cached in 'expr_values_buffer_'.  This will be replaced by codegen.
+  /// values cached in 'cur_expr_values_'.  This will be replaced by codegen.
   /// FORCE_NULL_EQUALITY is true if all nulls should be treated as equal, regardless
-  /// of the values of finds_nulls_
+  /// of the values of 'finds_nulls_'.
   template<bool FORCE_NULL_EQUALITY>
   bool IR_NO_INLINE Equals(TupleRow* build_row) const;
 
@@ -263,29 +437,16 @@ class HashTableCtx {
   /// The seeds to use for hashing. Indexed by the level.
   std::vector<uint32_t> seeds_;
 
-  /// Cache of exprs values for the current row being evaluated.  This can either
-  /// be a build row (during Insert()) or probe row (during FindProbeRow()).
-  std::vector<int> expr_values_buffer_offsets_;
-
-  /// Byte offset into 'expr_values_buffer_' that begins the variable length results.
-  /// If -1, there are no variable length slots. Never changes once set, can be removed
-  /// with codegen.
-  int var_result_begin_;
-
-  /// Byte size of 'expr_values_buffer_'. Never changes once set, can be removed with
-  /// codegen.
-  int results_buffer_size_;
-
-  /// Buffer to store evaluated expr results.  This address must not change once
-  /// allocated since the address is baked into the codegen.
-  uint8_t* expr_values_buffer_;
-
-  /// Use bytes instead of bools to be compatible with llvm.  This address must
-  /// not change once allocated.
-  uint8_t* expr_value_null_bits_;
+  /// The ExprValuesCache for caching expression evaluation results, null bytes and hash
+  /// values for rows. Used to store results of batch evaluations of rows.
+  ExprValuesCache expr_values_cache_;
 
   /// Scratch buffer to generate rows on the fly.
   TupleRow* scratch_row_;
+
+  /// Memory tracker of the exec node which owns this hash table context. Account the
+  /// memory usage of expression values cache towards it.
+  MemTracker* tracker_;
 };
 
 /// The hash table consists of a contiguous array of buckets that contain a pointer to the
@@ -381,26 +542,27 @@ class HashTable {
   /// the insert fails and this function returns false.
   /// Used during the build phase of hash joins.
   bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
-      const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash);
+      const BufferedTupleStream::RowIdx& idx, TupleRow* row);
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
   template<const bool READ>
   void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
 
-  /// Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'.
-  /// Returns HashTable::End() if no match is found. The iterator can be iterated until
-  /// HashTable::End() to find all the matching rows. Advancing the returned iterator will
-  /// go to the next matching row. The matching rows do not need to be evaluated since all
-  /// the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'.
-  /// Used during the probe phase of hash joins.
-  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash);
+  /// Returns an iterator to the bucket that matches the probe expression results that
+  /// are cached at the current position of the ExprValuesCache in 'ht_ctx'. Assumes that
+  /// the ExprValuesCache was filled using EvalAndHashProbe(). Returns HashTable::End()
+  /// if no match is found. The iterator can be iterated until HashTable::End() to find
+  /// all the matching rows. Advancing the returned iterator will go to the next matching
+  /// row. The matching rows do not need to be evaluated since all the nodes of a bucket
+  /// are duplicates. One scan can be in progress for each 'ht_ctx'. Used in the probe
+  /// phase of hash joins.
+  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx);
 
   /// If a match is found in the table, return an iterator as in FindProbeRow(). If a
   /// match was not present, return an iterator pointing to the empty bucket where the key
   /// should be inserted. Returns End() if the table is full. The caller can set the data
   /// in the bucket using a Set*() method on the iterator.
-  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, uint32_t hash,
-      bool* found);
+  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, bool* found);
 
   /// Returns number of elements inserted in the hash table
   int64_t size() const {
@@ -531,6 +693,10 @@ class HashTable {
     /// Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
     bool ALWAYS_INLINE AtEnd() const { return bucket_idx_ == BUCKET_NOT_FOUND; }
 
+    /// Prefetch the hash table bucket which the iterator is pointing to now.
+    template<const bool READ>
+    void IR_ALWAYS_INLINE PrefetchBucket();
+
    private:
     friend class HashTable;
 
@@ -579,28 +745,24 @@ class HashTable {
   /// Using the returned index value, the caller can create an iterator that can be
   /// iterated until End() to find all the matching rows.
   ///
-  /// If 'row' is not NULL, 'row' will be evaluated once against either the build or
-  /// probe exprs (determined by the parameter 'is_build') before calling Equals().
-  /// If 'row' is NULL, EvalAndHashBuild() or EvalAndHashProbe() must have been called
-  /// before calling this function.
+  /// EvalAndHashBuild() or EvalAndHashProbe() must have been called before calling
+  /// this function. The values of the expression values cache in 'ht_ctx' will be
+  /// used to probe the hash table.
   ///
   /// 'FORCE_NULL_EQUALITY' is true if NULLs should always be considered equal when
   /// comparing two rows.
   ///
-  /// 'is_build' indicates which of build or probe exprs is used for lazy evaluation.
-  /// 'row' is the row being probed against the hash table. Used for lazy evaluation.
   /// 'hash' is the hash computed by EvalAndHashBuild() or EvalAndHashProbe().
   /// 'found' indicates that a bucket that contains an equal row is found.
   ///
   /// There are wrappers of this function that perform the Find and Insert logic.
   template <bool FORCE_NULL_EQUALITY>
-  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets, bool is_build,
-      HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found);
+  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets,
+      HashTableCtx* ht_ctx, uint32_t hash, bool* found);
 
   /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node
   /// where the data should be inserted. Returns NULL if the insert was not successful.
-  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, TupleRow* row,
-      uint32_t hash);
+  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx);
 
   /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
   /// duplicates, 'node' will be pointing to the head of the linked list of duplicates.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index 0d4b1b6..6d33869 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -23,23 +23,30 @@
 
 namespace impala {
 
-inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row) {
   bool has_null = EvalBuildRow(row);
   if (!stores_nulls_ && has_null) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
-inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row) {
   bool has_null = EvalProbeRow(row);
   if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
+inline void HashTableCtx::ExprValuesCache::NextRow() {
+  cur_expr_values_ += expr_values_bytes_per_row_;
+  cur_expr_values_null_ += num_exprs_;
+  ++cur_expr_values_hash_;
+  DCHECK_LE(cur_expr_values_hash_ - expr_values_hash_array_.get(), capacity_);
+}
+
 template <bool FORCE_NULL_EQUALITY>
 inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
-    bool is_build, HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found) {
+    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
   DCHECK(buckets != NULL);
   DCHECK_GT(num_buckets, 0);
   *found = false;
@@ -49,20 +56,10 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
   // for knowing when to exit the loop (e.g. by capping the total travel length). In case
   // of quadratic probing it is also used for calculating the length of the next jump.
   int64_t step = 0;
-  bool need_eval = row != NULL;
   do {
     Bucket* bucket = &buckets[bucket_idx];
     if (LIKELY(!bucket->filled)) return bucket_idx;
     if (hash == bucket->hash) {
-      // Evaluate 'row' if needed before calling Equals() for the first time in this loop.
-      if (need_eval) {
-        if (is_build) {
-          ht_ctx->EvalBuildRow(row);
-        } else {
-          ht_ctx->EvalProbeRow(row);
-        }
-        need_eval = false;
-      }
       if (ht_ctx != NULL &&
           ht_ctx->Equals<FORCE_NULL_EQUALITY>(GetRow(bucket, ht_ctx->scratch_row_))) {
         *found = true;
@@ -89,12 +86,11 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
   return Iterator::BUCKET_NOT_FOUND;
 }
 
-inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
-    TupleRow* row, uint32_t hash) {
+inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, true, ht_ctx, row, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, &found);
   DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
   if (found) {
     // We need to insert a duplicate node, note that this may fail to allocate memory.
@@ -108,8 +104,8 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
 }
 
 inline bool HashTable::Insert(HashTableCtx* ht_ctx,
-    const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash) {
-  HtData* htdata = InsertInternal(ht_ctx, row, hash);
+    const BufferedTupleStream::RowIdx& idx, TupleRow* row) {
+  HtData* htdata = InsertInternal(ht_ctx);
   // If successful insert, update the contents of the newly inserted entry with 'idx'.
   if (LIKELY(htdata != NULL)) {
     if (stores_tuples_) {
@@ -133,11 +129,11 @@ inline void HashTable::PrefetchBucket(uint32_t hash) {
   __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
 }
 
-inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash) {
+inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<false>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash, &found);
   if (found) {
     return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
         buckets_[bucket_idx].bucketData.duplicates);
@@ -147,10 +143,10 @@ inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx, uint32_
 
 // TODO: support lazy evaluation like HashTable::Insert().
 inline HashTable::Iterator HashTable::FindBuildRowBucket(
-    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+    HashTableCtx* ht_ctx, bool* found) {
   ++num_probes_;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, found);
   DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ?
       buckets_[bucket_idx].bucketData.duplicates : NULL;
   return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates);
@@ -318,6 +314,16 @@ inline void HashTable::Iterator::SetAtEnd() {
   node_ = NULL;
 }
 
+template<const bool READ>
+inline void HashTable::Iterator::PrefetchBucket() {
+  if (LIKELY(!AtEnd())) {
+    // HashTable::PrefetchBucket() takes a hash value to index into the hash bucket
+    // array. Passing 'bucket_idx_' here is sufficient.
+    DCHECK_EQ((bucket_idx_ & ~(table_->num_buckets_ - 1)), 0);
+    table_->PrefetchBucket<READ>(bucket_idx_);
+  }
+}
+
 inline void HashTable::Iterator::Next() {
   DCHECK(!AtEnd());
   if (table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) {