You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/17 08:30:20 UTC

[4/4] incubator-impala git commit: IMPALA-3286: Prefetching for PHJ probing.

IMPALA-3286: Prefetching for PHJ probing.

This change pipelines the code which probes the hash tables.
This is based on the idea which Mostafa presented earlier.
Essentially, all rows in a row batch will be evaluated and
hashed first before being probed against the hash tables.
Hash table buckets are prefetched as hash values of rows are
computed.

To avoid re-evaluating the rows again during probing (as the rows
have been evaluated once to compute the hash values), hash table
context has been updated to cache the evaluated expression values,
null bits and hash values of some number of rows. Hash table context
provies a new iterator like interface to iterate through the cached
values.

A PREFETCH_MODE query option has also been added to disable prefetching
if necessary. The default mode is 1 which means hash table buckets will
be prefetched. In the future, this mode may be extended to support hash
table buckets' data prefetching too.

Combined with the build side prefetching, a self join of table lineitem
improves by 40% on a single node run on average:

select count(*)
from lineitem o1, lineitem o2
where o1.l_orderkey = o2.l_orderkey and
      o1.l_linenumber = o2.l_linenumber;

Change-Id: Ib42b93d99d09c833571e39d20d58c11ef73f3cc0
Reviewed-on: http://gerrit.cloudera.org:8080/2959
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a59408b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a59408b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a59408b5

Branch: refs/heads/master
Commit: a59408b575ff6f48b8f0e1084ebcf7489c9de8af
Parents: b634a55
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 27 17:09:50 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 17 01:30:12 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc                  | 169 ++++---
 be/src/exec/hash-table.cc                       | 484 ++++++++++++++-----
 be/src/exec/hash-table.h                        | 334 +++++++++----
 be/src/exec/hash-table.inline.h                 |  60 +--
 be/src/exec/partitioned-aggregation-node-ir.cc  |  14 +-
 be/src/exec/partitioned-aggregation-node.cc     |  16 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    | 344 ++++++++-----
 be/src/exec/partitioned-hash-join-node.cc       | 122 +++--
 be/src/exec/partitioned-hash-join-node.h        |  62 ++-
 be/src/exec/partitioned-hash-join-node.inline.h |   1 +
 be/src/exprs/expr-context.cc                    |  23 +-
 be/src/exprs/expr-context.h                     |  10 +-
 be/src/runtime/row-batch.h                      |  22 +-
 be/src/runtime/test-env.cc                      |   1 +
 be/src/service/query-options.cc                 |  11 +
 be/src/service/query-options.h                  |   5 +-
 be/src/udf/udf-internal.h                       |   3 +
 common/thrift/ImpalaInternalService.thrift      |   3 +
 common/thrift/ImpalaService.thrift              |   5 +-
 common/thrift/Types.thrift                      |   8 +
 20 files changed, 1177 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 25cd4f1..7559473 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -145,13 +145,12 @@ class HashTableTest : public testing::Test {
 
   void ProbeTest(HashTable* table, HashTableCtx* ht_ctx,
       ProbeTestData* data, int num_data, bool scan) {
-    uint32_t hash = 0;
     for (int i = 0; i < num_data; ++i) {
       TupleRow* row = data[i].probe_row;
 
       HashTable::Iterator iter;
-      if (ht_ctx->EvalAndHashProbe(row, &hash)) continue;
-      iter = table->FindProbeRow(ht_ctx, hash);
+      if (ht_ctx->EvalAndHashProbe(row)) continue;
+      iter = table->FindProbeRow(ht_ctx);
 
       if (data[i].expected_build_rows.size() == 0) {
         EXPECT_TRUE(iter.AtEnd());
@@ -183,7 +182,6 @@ class HashTableTest : public testing::Test {
       int max_num_blocks = 100, int reserved_blocks = 10) {
     EXPECT_TRUE(test_env_->CreateQueryState(0, max_num_blocks, block_size,
         &runtime_state_).ok());
-
     BufferedBlockMgr::Client* client;
     EXPECT_TRUE(runtime_state_->block_mgr()->RegisterClient("", reserved_blocks, false,
         &tracker_, runtime_state_, &client).ok());
@@ -236,19 +234,21 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, true /* stores_nulls_ */,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, true /* stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
     for (int i = 0; i < 2; ++i) {
-      uint32_t hash = 0;
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
     hash_table->Close();
+    ht_ctx->Close();
   }
 
   // This test inserts the build rows [0->5) to hash table. It validates that they
@@ -269,51 +269,52 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
-    uint32_t hash = 0;
-    bool success = hash_table->CheckAndResize(5, &ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
+    bool success = hash_table->CheckAndResize(5, ht_ctx.get());
     ASSERT_TRUE(success);
     for (int i = 0; i < 5; ++i) {
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->size(), 5);
 
     // Do a full table scan and validate returned pointers
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Double the size of the hash table and scan again.
-    ResizeTable(hash_table.get(), 2048, &ht_ctx);
+    ResizeTable(hash_table.get(), 2048, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 2048);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Try to shrink and scan again.
-    ResizeTable(hash_table.get(), 64, &ht_ctx);
+    ResizeTable(hash_table.get(), 64, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 64);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Resize to 8, which is the smallest value to fit the number of filled buckets.
-    ResizeTable(hash_table.get(), 8, &ht_ctx);
+    ResizeTable(hash_table.get(), 8, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 8);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   void ScanTest(bool quadratic, int initial_size, int rows_to_insert,
@@ -322,24 +323,26 @@ class HashTableTest : public testing::Test {
     ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
 
     int total_rows = rows_to_insert + additional_rows;
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Add 1 row with val 1, 2 with val 2, etc.
     vector<TupleRow*> build_rows;
     ProbeTestData* probe_rows = new ProbeTestData[total_rows];
     probe_rows[0].probe_row = CreateTupleRow(0);
-    uint32_t hash = 0;
     for (int val = 1; val <= rows_to_insert; ++val) {
-      bool success = hash_table->CheckAndResize(val, &ht_ctx);
+      bool success = hash_table->CheckAndResize(val, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << val;
       probe_rows[val].probe_row = CreateTupleRow(val);
       for (int i = 0; i < val; ++i) {
         TupleRow* row = CreateTupleRow(val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         build_rows.push_back(row);
         probe_rows[val].expected_build_rows.push_back(row);
       }
@@ -351,22 +354,22 @@ class HashTableTest : public testing::Test {
     }
 
     // Test that all the builds were found.
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     // Resize and try again.
     int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     delete [] probe_rows;
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test continues adding tuples to the hash table and exercises the resize code
@@ -378,27 +381,29 @@ class HashTableTest : public testing::Test {
     MemTracker tracker(100 * 1024 * 1024);
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + (num_to_add^20)
     // entries. When num_to_add == 4, then the total number of inserts is 4194300.
     int build_row_val = 0;
-    uint32_t hash = 0;
     for (int i = 0; i < 20; ++i) {
       // Currently the mem used for the bucket is not being tracked by the mem tracker.
       // Thus the resize is expected to be successful.
       // TODO: Keep track of the mem used for the buckets and test cases where we actually
       // hit OOM.
       // TODO: Insert duplicates to also hit OOM.
-      bool success = hash_table->CheckAndResize(num_to_add, &ht_ctx);
+      bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << num_to_add;
       for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
         TupleRow* row = CreateTupleRow(build_row_val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         if (!inserted) goto done_inserting;
       }
       expected_size += num_to_add;
@@ -410,8 +415,8 @@ class HashTableTest : public testing::Test {
     // Validate that we can find the entries before we went over the limit
     for (int i = 0; i < expected_size * 5; i += 100000) {
       TupleRow* probe_row = CreateTupleRow(i);
-      if (!ht_ctx.EvalAndHashProbe(probe_row, &hash)) continue;
-      HashTable::Iterator iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      if (!ht_ctx->EvalAndHashProbe(probe_row)) continue;
+      HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
       if (i < hash_table->size()) {
         EXPECT_TRUE(!iter.AtEnd()) << " i: " << i;
         ValidateMatch(probe_row, iter.GetRow());
@@ -420,7 +425,7 @@ class HashTableTest : public testing::Test {
       }
     }
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test inserts and probes as many elements as the size of the hash table without
@@ -430,9 +435,12 @@ class HashTableTest : public testing::Test {
   void InsertFullTest(bool quadratic, int table_size) {
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
     EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
 
     // Insert and probe table_size different tuples. All of them are expected to be
     // successfully inserted and probed.
@@ -441,30 +449,32 @@ class HashTableTest : public testing::Test {
     bool found;
     for (int build_row_val = 0; build_row_val < table_size; ++build_row_val) {
       TupleRow* row = CreateTupleRow(build_row_val);
-      bool passes = ht_ctx.EvalAndHashBuild(row, &hash);
+      bool passes = ht_ctx->EvalAndHashBuild(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
 
       // Insert using both Insert() and FindBucket() methods.
       if (build_row_val % 2 == 0) {
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         EXPECT_TRUE(inserted);
       } else {
-        iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+        iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
         EXPECT_FALSE(iter.AtEnd());
         EXPECT_FALSE(found);
         iter.SetTuple(row->GetTuple(0), hash);
       }
       EXPECT_EQ(hash_table->EmptyBuckets(), table_size - build_row_val - 1);
 
-      passes = ht_ctx.EvalAndHashProbe(row, &hash);
+      passes = ht_ctx->EvalAndHashProbe(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
-      iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      iter = hash_table->FindProbeRow(ht_ctx.get());
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
 
-      iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+      iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_TRUE(found);
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
@@ -474,18 +484,18 @@ class HashTableTest : public testing::Test {
     // hash table code path.
     EXPECT_EQ(hash_table->EmptyBuckets(), 0);
     TupleRow* probe_row = CreateTupleRow(table_size);
-    bool passes = ht_ctx.EvalAndHashProbe(probe_row, &hash);
+    bool passes = ht_ctx->EvalAndHashProbe(probe_row);
     EXPECT_TRUE(passes);
-    iter = hash_table->FindProbeRow(&ht_ctx, hash);
+    iter = hash_table->FindProbeRow(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
 
     // Since hash_table is full, FindBucket cannot find an empty bucket, so returns End().
-    iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+    iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
     EXPECT_TRUE(iter.AtEnd());
     EXPECT_FALSE(found);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test makes sure we can tolerate the low memory case where we do not have enough
@@ -498,12 +508,15 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTable> hash_table;
     ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size,
           max_num_blocks, reserved_blocks));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-    HashTable::Iterator iter = hash_table->Begin(&ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx);
+    EXPECT_OK(status);
+    HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
-
     hash_table->Close();
+    ht_ctx->Close();
   }
 };
 
@@ -582,17 +595,23 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-      std::vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1);
+  EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024,
+      &runtime_state_).ok());
+  scoped_ptr<HashTableCtx> ht_ctx;
+  Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+      probe_expr_ctxs_, false /* !stores_nulls_ */,
+      vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1, &tracker_, &ht_ctx);
+  EXPECT_OK(status);
+
   uint32_t seed = 9999;
-  ht_ctx.set_level(0);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
+  ht_ctx->set_level(0);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
   // TODO: level 0 uses CRC hash, which only swaps bytes around on empty input.
-  // EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.set_level(1);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.Close();
+  // EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx->set_level(1);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx.get()->Close();
 }
 
 TEST_F(HashTableTest, VeryLowMemTest) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 9cab765..953ddce 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -84,8 +84,8 @@ static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof
 
 HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
     const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
-    const std::vector<bool>& finds_nulls, int32_t initial_seed,
-    int max_levels, int num_build_tuples)
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    MemTracker* tracker)
     : build_expr_ctxs_(build_expr_ctxs),
       probe_expr_ctxs_(probe_expr_ctxs),
       stores_nulls_(stores_nulls),
@@ -93,17 +93,13 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
       finds_some_nulls_(std::accumulate(
           finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())),
       level_(0),
-      scratch_row_(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) * num_build_tuples))) {
+      scratch_row_(NULL),
+      tracker_(tracker) {
   DCHECK(!finds_some_nulls_ || stores_nulls_);
   // Compute the layout and buffer size to store the evaluated expr results
   DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size());
   DCHECK_EQ(build_expr_ctxs_.size(), finds_nulls_.size());
   DCHECK(!build_expr_ctxs_.empty());
-  results_buffer_size_ = Expr::ComputeResultsLayout(build_expr_ctxs_,
-      &expr_values_buffer_offsets_, &var_result_begin_);
-  expr_values_buffer_ = new uint8_t[results_buffer_size_];
-  memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_);
-  expr_value_null_bits_ = new uint8_t[build_expr_ctxs.size()];
 
   // Populate the seeds to use for all the levels. TODO: revisit how we generate these.
   DCHECK_GE(max_levels, 0);
@@ -116,32 +112,65 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
   }
 }
 
+Status HashTableCtx::Create(RuntimeState* state,
+    const std::vector<ExprContext*>& build_expr_ctxs,
+    const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    int num_build_tuples, MemTracker* tracker, scoped_ptr<HashTableCtx>* ht_ctx) {
+  ht_ctx->reset(new HashTableCtx(build_expr_ctxs, probe_expr_ctxs, stores_nulls,
+      finds_nulls, initial_seed, max_levels, tracker));
+  return ht_ctx->get()->Init(state, num_build_tuples);
+}
+
+Status HashTableCtx::Init(RuntimeState* state, int num_build_tuples) {
+  int scratch_row_size = sizeof(Tuple*) * num_build_tuples;
+  scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size));
+  if (UNLIKELY(scratch_row_ == NULL)) {
+    return Status(Substitute("Failed to allocate $0 bytes for scratch row of "
+        "HashTableCtx.", scratch_row_size));
+  }
+  return expr_values_cache_.Init(state, tracker_, build_expr_ctxs_);
+}
+
 void HashTableCtx::Close() {
-  // TODO: use tr1::array?
-  DCHECK(expr_values_buffer_ != NULL);
-  delete[] expr_values_buffer_;
-  expr_values_buffer_ = NULL;
-  DCHECK(expr_value_null_bits_ != NULL);
-  delete[] expr_value_null_bits_;
-  expr_value_null_bits_ = NULL;
   free(scratch_row_);
   scratch_row_ = NULL;
+  expr_values_cache_.Close(tracker_);
+}
+
+uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
+  /// Use CRC hash at first level for better performance. Switch to murmur hash at
+  /// subsequent levels since CRC doesn't randomize well with different seed inputs.
+  if (level_ == 0) return HashUtil::Hash(input, len, hash);
+  return HashUtil::MurmurHash2_64(input, len, hash);
+}
+
+uint32_t HashTableCtx::HashCurrentRow() const {
+  DCHECK_LT(level_, seeds_.size());
+  if (expr_values_cache_.var_result_offset() == -1) {
+    /// This handles NULLs implicitly since a constant seed value was put
+    /// into results buffer for nulls.
+    return Hash(expr_values_cache_.cur_expr_values_,
+        expr_values_cache_.expr_values_bytes_per_row(), seeds_[level_]);
+  } else {
+    return HashTableCtx::HashVariableLenRow();
+  }
 }
 
 bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
   bool has_null = false;
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < ctxs.size(); ++i) {
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     void* val = ctxs[i]->GetValue(row);
     if (val == NULL) {
       // If the table doesn't store nulls, no reason to keep evaluating
       if (!stores_nulls_) return true;
-
-      expr_value_null_bits_[i] = true;
+      exprs_nullness[i] = true;
       val = reinterpret_cast<void*>(&NULL_VALUE);
       has_null = true;
     } else {
-      expr_value_null_bits_[i] = false;
+      exprs_nullness[i] = false;
     }
     DCHECK_LE(build_expr_ctxs_[i]->root()->type().GetSlotSize(),
         sizeof(NULL_VALUE));
@@ -152,18 +181,20 @@ bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
 
 uint32_t HashTableCtx::HashVariableLenRow() const {
   uint32_t hash = seeds_[level_];
+  int var_result_offset = expr_values_cache_.var_result_offset();
   // Hash the non-var length portions (if there are any)
-  if (var_result_begin_ != 0) {
-    hash = Hash(expr_values_buffer_, var_result_begin_, hash);
+  if (var_result_offset != 0) {
+    hash = Hash(expr_values_cache_.cur_expr_values_, var_result_offset, hash);
   }
 
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
-    // non-string and null slots are already part of expr_values_buffer
+    // non-string and null slots are already part of cur_expr_values_
     if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING &&
         build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue;
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    if (expr_value_null_bits_[i]) {
+    void* loc = expr_values_cache_.ExprValuePtr(i);
+    if (exprs_nullness[i]) {
       // Hash the null random seed values at 'loc'
       hash = Hash(loc, sizeof(StringValue), hash);
     } else {
@@ -178,17 +209,18 @@ uint32_t HashTableCtx::HashVariableLenRow() const {
 
 template<bool FORCE_NULL_EQUALITY>
 bool HashTableCtx::Equals(TupleRow* build_row) const {
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     void* val = build_expr_ctxs_[i]->GetValue(build_row);
     if (val == NULL) {
       if (!(FORCE_NULL_EQUALITY || finds_nulls_[i])) return false;
-      if (!expr_value_null_bits_[i]) return false;
+      if (!exprs_nullness[i]) return false;
       continue;
     } else {
-      if (expr_value_null_bits_[i]) return false;
+      if (exprs_nullness[i]) return false;
     }
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     if (!RawValue::Eq(loc, val, build_expr_ctxs_[i]->root()->type())) {
       return false;
     }
@@ -199,6 +231,113 @@ bool HashTableCtx::Equals(TupleRow* build_row) const {
 template bool HashTableCtx::Equals<true>(TupleRow* build_row) const;
 template bool HashTableCtx::Equals<false>(TupleRow* build_row) const;
 
+HashTableCtx::ExprValuesCache::ExprValuesCache()
+    : capacity_(0),
+      cur_expr_values_(NULL),
+      cur_expr_values_null_(NULL),
+      cur_expr_values_hash_(NULL),
+      cur_expr_values_hash_end_(NULL),
+      expr_values_array_(NULL),
+      expr_values_null_array_(NULL),
+      expr_values_hash_array_(NULL),
+      null_bitmap_(0) { }
+
+Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state,
+    MemTracker* tracker, const std::vector<ExprContext*>& build_expr_ctxs) {
+  // Initialize the number of expressions.
+  num_exprs_ = build_expr_ctxs.size();
+  // Compute the layout of evaluated values of a row.
+  expr_values_bytes_per_row_ = Expr::ComputeResultsLayout(build_expr_ctxs,
+      &expr_values_offsets_, &var_result_offset_);
+  if (expr_values_bytes_per_row_ == 0) {
+    DCHECK_EQ(num_exprs_, 0);
+    return Status::OK();
+  }
+  DCHECK_GT(expr_values_bytes_per_row_, 0);
+  // Compute the maximum number of cached rows which can fit in the memory budget.
+  // TODO: Find the optimal prefetch batch size. This may be something
+  // processor dependent so we may need calibration at Impala startup time.
+  capacity_ = std::max(1, std::min(state->batch_size(),
+      MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
+
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  if (!tracker->TryConsume(mem_usage)) {
+    capacity_ = 0;
+    string details = Substitute("HashTableCtx::ExprValuesCache failed to allocate $0 bytes.",
+        mem_usage);
+    return tracker->MemLimitExceeded(state, details, mem_usage);
+  }
+
+  int expr_values_size = expr_values_bytes_per_row_ * capacity_;
+  expr_values_array_.reset(new uint8_t[expr_values_size]);
+  cur_expr_values_ = expr_values_array_.get();
+  memset(cur_expr_values_, 0, expr_values_size);
+
+  int expr_values_null_size = num_exprs_ * capacity_;
+  expr_values_null_array_.reset(new uint8_t[expr_values_null_size]);
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  memset(cur_expr_values_null_, 0, expr_values_null_size);
+
+  expr_values_hash_array_.reset(new uint32_t[capacity_]);
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  memset(cur_expr_values_hash_, 0, sizeof(uint32) * capacity_);
+
+  null_bitmap_.Reset(capacity_);
+  return Status::OK();
+}
+
+void HashTableCtx::ExprValuesCache::Close(MemTracker* tracker) {
+  if (capacity_ == 0) return;
+  cur_expr_values_ = NULL;
+  cur_expr_values_null_ = NULL;
+  cur_expr_values_hash_ = NULL;
+  cur_expr_values_hash_end_ = NULL;
+  expr_values_array_.reset();
+  expr_values_null_array_.reset();
+  expr_values_hash_array_.reset();
+  null_bitmap_.Reset(0);
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  tracker->Release(mem_usage);
+}
+
+int HashTableCtx::ExprValuesCache::MemUsage(int capacity,
+    int expr_values_bytes_per_row, int num_exprs) {
+  return expr_values_bytes_per_row * capacity + // expr_values_array_
+      num_exprs * capacity +                    // expr_values_null_array_
+      sizeof(uint32) * capacity +               // expr_values_hash_array_
+      Bitmap::MemUsage(capacity);               // null_bitmap_
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValuePtr(int expr_idx) const {
+  return cur_expr_values_ + expr_values_offsets_[expr_idx];
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValueNullPtr(int expr_idx) const {
+  return cur_expr_values_null_ + expr_idx;
+}
+
+void HashTableCtx::ExprValuesCache::ResetIterators() {
+  cur_expr_values_ = expr_values_array_.get();
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+}
+
+void HashTableCtx::ExprValuesCache::Reset() {
+  ResetIterators();
+  // Set the end pointer after resetting the other pointers so they point to
+  // the same location.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  null_bitmap_.SetAllBits(false);
+}
+
+void HashTableCtx::ExprValuesCache::ResetForRead() {
+  // Record the end of hash values iterator to be used in AtEnd().
+  // Do it before resetting the pointers.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  ResetIterators();
+}
+
 const double HashTable::MAX_FILL_FACTOR = 0.75f;
 
 HashTable* HashTable::Create(RuntimeState* state,
@@ -306,7 +445,7 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
     Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_];
     bool found = false;
     int64_t bucket_idx =
-        Probe<true>(new_buckets, num_buckets, false, NULL, NULL, bucket_to_copy->hash, &found);
+        Probe<true>(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found);
     DCHECK(!found);
     DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though "
         " there are free buckets. " << num_buckets << " " << num_filled_buckets_;
@@ -458,30 +597,71 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 }
 
 // Codegen for evaluating a tuple row over either build_expr_ctxs_ or probe_expr_ctxs_.
-// For the case where we are joining on a single int, the IR looks like
-// define i1 @EvalBuildRow(%"class.impala::HashTableCtx"* %this_ptr,
-//                         %"class.impala::TupleRow"* %row) #20 {
+// For a group by with (big int, string) the IR looks like
+// define i1 @EvalProbeRow(%"class.impala::HashTableCtx"* %this_ptr,
+//                         %"class.impala::TupleRow"* %row) #33 {
 // entry:
-//   %result = call i64 @GetSlotRef1(%"class.impala::ExprContext"* inttoptr
-//                                     (i64 67971664 to %"class.impala::ExprContext"*),
-//                                   %"class.impala::TupleRow"* %row)
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %loc_addr = getelementptr i8, i8* %0, i32 0
+//   %loc = bitcast i8* %loc_addr to i32*
+//   %result = call i64 @GetSlotRef.3(%"class.impala::ExprContext"*
+//             inttoptr (i64 158123712 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
 //   %is_null = trunc i64 %result to i1
-//   %0 = zext i1 %is_null to i8
-//   store i8 %0, i8* inttoptr (i64 95753144 to i8*)
+//   %2 = zext i1 %is_null to i8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 0
+//   store i8 %2, i8* %null_byte_loc
 //   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   store i32 -2128831035, i32* inttoptr (i64 95753128 to i32*)
+//   store i32 -2128831035, i32* %loc
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %1 = ashr i64 %result, 32
-//   %2 = trunc i64 %1 to i32
-//   store i32 %2, i32* inttoptr (i64 95753128 to i32*)
+//   %3 = ashr i64 %result, 32
+//   %4 = trunc i64 %3 to i32
+//   store i32 %4, i32* %loc
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   ret i1 true
+//   %is_null_phi = phi i1 [ true, %null ], [ false, %not_null ]
+//   %has_null = or i1 false, %is_null_phi
+//   %loc_addr1 = getelementptr i8, i8* %0, i32 8
+//   %loc2 = bitcast i8* %loc_addr1 to %"struct.impala::StringValue"*
+//   %result6 = call { i64, i8* } @GetSlotRef.4(%"class.impala::ExprContext"*
+//              inttoptr (i64 158123904 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result6, 0
+//   %is_null7 = trunc i64 %5 to i1
+//   %6 = zext i1 %is_null7 to i8
+//   %null_byte_loc8 = getelementptr i8, i8* %1, i32 1
+//   store i8 %6, i8* %null_byte_loc8
+//   br i1 %is_null7, label %null3, label %not_null4
+//
+// null3:                                            ; preds = %continue
+//   %string_ptr = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 0
+//   %string_len = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 1
+//   store i8* inttoptr (i32 -2128831035 to i8*), i8** %string_ptr
+//   store i32 -2128831035, i32* %string_len
+//   br label %continue5
+//
+// not_null4:                                        ; preds = %continue
+//   %result9 = extractvalue { i64, i8* } %result6, 1
+//   %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %result9, 0
+//   %8 = extractvalue { i64, i8* } %result6, 0
+//   %9 = ashr i64 %8, 32
+//   %10 = trunc i64 %9 to i32
+//   %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
+//   store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %loc2
+//   br label %continue5
+//
+// continue5:                                        ; preds = %not_null4, %null3
+//   %is_null_phi10 = phi i1 [ true, %null3 ], [ false, %not_null4 ]
+//   %has_null11 = or i1 %has_null, %is_null_phi10
+//   ret i1 %has_null11
 // }
 // For each expr, we create 3 code blocks.  The null, not null and continue blocks.
 // Both the null and not null branch into the continue block.  The continue block
@@ -509,7 +689,7 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
       codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
@@ -519,18 +699,29 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
   LlvmCodeGen::LlvmBuilder builder(context);
   Value* args[2];
   *fn = prototype.GeneratePrototype(&builder, args);
-
   Value* row = args[1];
   Value* has_null = codegen->false_value();
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   for (int i = 0; i < ctxs.size(); ++i) {
     // TODO: refactor this to somewhere else?  This is not hash table specific except for
     // the null handling bit and would be used for anyone that needs to materialize a
     // vector of exprs
     // Convert result buffer to llvm ptr type
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* llvm_loc = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(ctxs[i]->root()->type()), loc);
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
+    Value* llvm_loc = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(ctxs[i]->root()->type()), "loc");
 
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
     BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", *fn);
@@ -555,11 +746,9 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 
     // Set null-byte result
     Value* null_byte = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT));
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-    Value* llvm_null_byte_loc =
-        codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+    Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+        codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
     builder.CreateStore(null_byte, llvm_null_byte_loc);
-
     builder.CreateCondBr(is_null, null_block, not_null_block);
 
     // Null block
@@ -599,30 +788,37 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
 
 // Codegen for hashing the current row.  In the case with both string and non-string data
 // (group by int_col, string_col), the IR looks like:
-// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #20 {
+// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #33 {
 // entry:
-//   %seed = call i32 @GetHashSeed(%"class.impala::HashTableCtx"* %this_ptr)
-//   %0 = call i32 @CrcHash16(i8* inttoptr (i64 119151296 to i8*), i32 16, i32 %seed)
-//   %1 = load i8* inttoptr (i64 119943721 to i8*)
-//   %2 = icmp ne i8 %1, 0
-//   br i1 %2, label %null, label %not_null
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %seed = call i32 @_ZNK6impala12HashTableCtx11GetHashSeedEv(
+//           %"class.impala::HashTableCtx"* %this_ptr)
+//   %hash = call i32 @CrcHash8(i8* %0, i32 8, i32 %seed)
+//   %loc_addr = getelementptr i8, i8* %0, i32 8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 1
+//   %null_byte = load i8, i8* %null_byte_loc
+//   %is_null = icmp ne i8 %null_byte, 0
+//   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   %3 = call i32 @CrcHash161(i8* inttoptr (i64 119151312 to i8*), i32 16, i32 %0)
+//   %str_null = call i32 @CrcHash16(i8* %loc_addr, i32 16, i32 %hash)
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %4 = load i8** getelementptr inbounds (%"struct.impala::StringValue"* inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 0)
-//   %5 = load i32* getelementptr inbounds (%"struct.impala::StringValue"* inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 1)
-//   %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0)
+//   %str_val = bitcast i8* %loc_addr to %"struct.impala::StringValue"*
+//   %2 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 0
+//   %3 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 1
+//   %ptr = load i8*, i8** %2
+//   %len = load i32, i32* %3
+//   %string_hash = call i32 @IrCrcHash(i8* %ptr, i32 %len, i32 %hash)
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   %7 = phi i32 [ %6, %not_null ], [ %3, %null ]
-//   call void @set_hash(%"class.impala::HashTableCtx"* %this_ptr, i32 %7)
-//   ret i32 %7
+//   %hash_phi = phi i32 [ %string_hash, %not_null ], [ %str_null, %null ]
+//   ret i32 %hash_phi
 // }
 Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
     Function** fn) {
@@ -640,6 +836,7 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
 
   LlvmCodeGen::FnPrototype prototype(codegen,
       (use_murmur ? "MurmurHashCurrentRow" : "HashCurrentRow"),
@@ -651,6 +848,16 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   Value* this_arg;
   *fn = prototype.GeneratePrototype(&builder, &this_arg);
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   // Call GetHashSeed() to get seeds_[level_]
   Function* get_hash_seed_fn =
       codegen->GetFunction(IRFunction::HASH_TABLE_GET_HASH_SEED, false);
@@ -658,25 +865,26 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       "seed");
 
   Value* hash_result = seed;
-  Value* data = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), expr_values_buffer_);
-  if (var_result_begin_ == -1) {
-    // No variable length slots, just hash what is in 'expr_values_buffer_'
-    if (results_buffer_size_ > 0) {
+  const int var_result_offset = expr_values_cache_.var_result_offset();
+  const int expr_values_bytes_per_row = expr_values_cache_.expr_values_bytes_per_row();
+  if (var_result_offset == -1) {
+    // No variable length slots, just hash what is in 'expr_expr_values_cache_'
+    if (expr_values_bytes_per_row > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(results_buffer_size_) :
-                          codegen->GetHashFunction(results_buffer_size_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_);
+                          codegen->GetMurmurHashFunction(expr_values_bytes_per_row) :
+                          codegen->GetHashFunction(expr_values_bytes_per_row);
+      Value* len = codegen->GetIntConstant(TYPE_INT, expr_values_bytes_per_row);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
   } else {
-    if (var_result_begin_ > 0) {
+    if (var_result_offset > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(var_result_begin_) :
-                          codegen->GetHashFunction(var_result_begin_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_);
+                          codegen->GetMurmurHashFunction(var_result_offset) :
+                          codegen->GetHashFunction(var_result_offset);
+      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_offset);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
 
     // Hash string slots
@@ -689,7 +897,9 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       BasicBlock* continue_block = NULL;
       Value* str_null_result = NULL;
 
-      void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+      int offset = expr_values_cache_.expr_values_offsets(i);
+      Value* llvm_loc = builder.CreateGEP(NULL, cur_expr_values,
+          codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
 
       // If the hash table stores nulls, we need to check if the stringval
       // evaluated to NULL
@@ -698,9 +908,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
         not_null_block = BasicBlock::Create(context, "not_null", *fn);
         continue_block = BasicBlock::Create(context, "continue", *fn);
 
-        uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-        Value* llvm_null_byte_loc =
-            codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+        Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+            codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
         Value* null_byte = builder.CreateLoad(llvm_null_byte_loc, "null_byte");
         Value* is_null = builder.CreateICmpNE(null_byte,
             codegen->GetIntConstant(TYPE_TINYINT, 0), "is_null");
@@ -712,7 +921,6 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
         Function* null_hash_fn = use_murmur ?
                                  codegen->GetMurmurHashFunction(sizeof(StringValue)) :
                                  codegen->GetHashFunction(sizeof(StringValue));
-        Value* llvm_loc = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), loc);
         Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
         str_null_result = builder.CreateCall(null_hash_fn,
             ArrayRef<Value*>({llvm_loc, len, hash_result}), "str_null");
@@ -722,7 +930,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
       }
 
       // Convert expr_values_buffer_ loc to llvm value
-      Value* str_val = codegen->CastPtrToLlvmPtr(codegen->GetPtrType(TYPE_STRING), loc);
+      Value* str_val = builder.CreatePointerCast(llvm_loc,
+          codegen->GetPtrType(TYPE_STRING), "str_val");
 
       Value* ptr = builder.CreateStructGEP(NULL, str_val, 0);
       Value* len = builder.CreateStructGEP(NULL, str_val, 1);
@@ -759,55 +968,71 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool use_murmur,
   return Status::OK();
 }
 
-// Codegen for HashTableCtx::Equals.  For a hash table with two exprs (string,int),
+// Codegen for HashTableCtx::Equals.  For a group by with (bigint, string),
 // the IR looks like:
 //
 // define i1 @Equals(%"class.impala::HashTableCtx"* %this_ptr,
-//                   %"class.impala::TupleRow"* %row) {
+//                   %"class.impala::TupleRow"* %row) #33 {
 // entry:
+//   %0 = alloca { i64, i8* }
+//   %1 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %2 = load i8*, i8** inttoptr (i64 230325064 to i8**)
 //   %result = call i64 @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//                                  (i64 146381856 to %"class.impala::ExprContext"*),
-//                                  %"class.impala::TupleRow"* %row)
-//   %0 = trunc i64 %result to i1
-//   br i1 %0, label %null, label %not_null
+//             (i64 165557504 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
+//   %is_null = trunc i64 %result to i1
+//   %null_byte_loc = getelementptr i8, i8* %2, i32 0
+//   %3 = load i8, i8* %null_byte_loc
+//   %4 = icmp ne i8 %3, 0
+//   %loc = getelementptr i8, i8* %1, i32 0
+//   %row_val = bitcast i8* %loc to i32*
+//   br i1 %is_null, label %null, label %not_null
 //
-// false_block:                            ; preds = %not_null2, %null1, %not_null, %null
+// false_block:                ; preds = %cmp9, %not_null2, %null1, %cmp, %not_null, %null
 //   ret i1 false
 //
 // null:                                             ; preds = %entry
-//   br i1 false, label %continue, label %false_block
+//   br i1 %4, label %continue, label %false_block
 //
 // not_null:                                         ; preds = %entry
-//   %1 = load i32* inttoptr (i64 104774368 to i32*)
-//   %2 = ashr i64 %result, 32
-//   %3 = trunc i64 %2 to i32
-//   %cmp_raw = icmp eq i32 %3, %1
-//   br i1 %cmp_raw, label %continue, label %false_block
+//   br i1 %4, label %false_block, label %cmp
 //
-// continue:                                         ; preds = %not_null, %null
-//   %result4 = call { i64, i8* } @GetSlotRef1(
-//       %"class.impala::ExprContext"* inttoptr
-//       (i64 146381696 to %"class.impala::ExprContext"*),
-//       %"class.impala::TupleRow"* %row)
-//   %4 = extractvalue { i64, i8* } %result4, 0
-//   %5 = trunc i64 %4 to i1
-//   br i1 %5, label %null1, label %not_null2
+// continue:                                         ; preds = %cmp, %null
+//   %result4 = call { i64, i8* } @GetSlotRef.2(%"class.impala::ExprContext"*
+//              inttoptr (i64 165557696 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result4, 0
+//   %is_null5 = trunc i64 %5 to i1
+//   %null_byte_loc6 = getelementptr i8, i8* %2, i32 1
+//   %6 = load i8, i8* %null_byte_loc6
+//   %7 = icmp ne i8 %6, 0
+//   %loc7 = getelementptr i8, i8* %1, i32 8
+//   %row_val8 = bitcast i8* %loc7 to %"struct.impala::StringValue"*
+//   br i1 %is_null5, label %null1, label %not_null2
+//
+// cmp:                                              ; preds = %not_null
+//   %8 = load i32, i32* %row_val
+//   %9 = ashr i64 %result, 32
+//   %10 = trunc i64 %9 to i32
+//   %cmp_raw = icmp eq i32 %10, %8
+//   br i1 %cmp_raw, label %continue, label %false_block
 //
 // null1:                                            ; preds = %continue
-//   br i1 false, label %continue3, label %false_block
+//   br i1 %7, label %continue3, label %false_block
 //
 // not_null2:                                        ; preds = %continue
-//   %6 = extractvalue { i64, i8* } %result4, 0
-//   %7 = ashr i64 %6, 32
-//   %8 = trunc i64 %7 to i32
-//   %result5 = extractvalue { i64, i8* } %result4, 1
-//   %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE(
-//       i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr
-//       (i64 104774384 to %"struct.impala::StringValue"*))
-//   br i1 %cmp_raw6, label %continue3, label %false_block
+//   br i1 %7, label %false_block, label %cmp9
 //
-// continue3:                                        ; preds = %not_null2, %null1
+// continue3:                                        ; preds = %cmp9, %null1
 //   ret i1 true
+//
+// cmp9:                                             ; preds = %not_null2
+//   store { i64, i8* } %result4, { i64, i8* }* %0
+//   %11 = bitcast { i64, i8* }* %0 to %"struct.impala_udf::StringVal"*
+//   %cmp_raw10 = call i1 @_Z13StringValueEqRKN10impala_udf9StringValERKN6
+//                impala11StringValueE(%"struct.impala_udf::StringVal"* %11,
+//                %"struct.impala::StringValue"* %row_val8)
+//   br i1 %cmp_raw10, label %continue3, label %false_block
 // }
 Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality,
     Function** fn) {
@@ -828,7 +1053,7 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
@@ -839,6 +1064,16 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   *fn = prototype.GeneratePrototype(&builder, args);
   Value* row = args[1];
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
@@ -862,25 +1097,26 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
         build_expr_ctxs_[i]->root()->type(), expr_fn, expr_fn_args, "result");
     Value* is_null = result.GetIsNull();
 
-    // Determine if row is null (i.e. expr_value_null_bits_[i] == true). In
+    // Determine if row is null (i.e. cur_expr_values_null_[i] == true). In
     // the case where the hash table does not store nulls, this is always false.
     Value* row_is_null = codegen->false_value();
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
 
     // We consider null values equal if we are comparing build rows or if the join
     // predicate is <=>
     if (force_null_equality || finds_nulls_[i]) {
-      Value* llvm_null_byte_loc =
-          codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+      Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+          codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
       Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
       row_is_null = builder.CreateICmpNE(null_byte,
           codegen->GetIntConstant(TYPE_TINYINT, 0));
     }
 
-    // Get llvm value for row_val from 'expr_values_buffer_'
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* row_val = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), loc);
+    // Get llvm value for row_val from 'cur_expr_values_'
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc");
+    Value* row_val = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), "row_val");
 
     // Branch for GetValue() returning NULL
     builder.CreateCondBr(is_null, null_block, not_null_block);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 673822e..6fc4169 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -111,10 +111,12 @@ class HashTableCtx {
   ///  - probe_exprs are used during FindProbeRow()
   ///  - stores_nulls: if false, TupleRows with nulls are ignored during Insert
   ///  - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() for
-  ///      TupleRows with nulls in position i even if stores_nulls is true.
-  ///  - initial_seed: Initial seed value to use when computing hashes for rows with
+  ///        TupleRows with nulls in position i even if stores_nulls is true.
+  ///  - initial_seed: initial seed value to use when computing hashes for rows with
   ///    level 0. Other levels have their seeds derived from this seed.
-  ///  - The max levels we will hash with.
+  ///  - max_levels: the max levels we will hash with.
+  ///  - tracker: the memory tracker of the exec node which owns this hash table context.
+  ///        Memory usage of expression values cache is charged against it.
   /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined
   ///       with '<=>' and others with '=', stores_nulls could distinguish between columns
   ///       in which nulls are stored and columns in which they are not, which could save
@@ -122,7 +124,18 @@ class HashTableCtx {
   HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
       const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
-      int num_build_tuples);
+      MemTracker* tracker);
+
+  /// Create a hash table context with the specified parameters, invoke Init() to
+  /// initialize the new hash table context and return it in 'ht_ctx'. Please see header
+  /// comments of HashTableCtx constructor for details of the parameters.
+  /// 'num_build_tuples' is the number of tuples of a row in the build side, used for
+  /// computing the size of a scratch row.
+  static Status Create(RuntimeState* state,
+      const std::vector<ExprContext*>& build_expr_ctxs,
+      const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+      const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+      int num_build_tuples, MemTracker* tracker, boost::scoped_ptr<HashTableCtx>* ht_ctx);
 
   /// Call to cleanup any resources.
   void Close();
@@ -135,29 +148,28 @@ class HashTableCtx {
 
   TupleRow* ALWAYS_INLINE scratch_row() const { return scratch_row_; }
 
-  /// Returns the results of the exprs at 'expr_idx' evaluated over the last row
-  /// processed.
+  /// Returns the results of the expression at 'expr_idx' evaluated at the current row.
   /// This value is invalid if the expr evaluated to NULL.
   /// TODO: this is an awkward abstraction but aggregation node can take advantage of
   /// it and save some expr evaluation calls.
-  void* ALWAYS_INLINE last_expr_value(int expr_idx) const {
-    return expr_values_buffer_ + expr_values_buffer_offsets_[expr_idx];
+  void* ALWAYS_INLINE ExprValue(int expr_idx) const {
+    return expr_values_cache_.ExprValuePtr(expr_idx);
   }
 
-  /// Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
-  bool ALWAYS_INLINE last_expr_value_null(int expr_idx) const {
-    return expr_value_null_bits_[expr_idx];
+  /// Returns if the expression at 'expr_idx' is evaluated to NULL for the current row.
+  bool ALWAYS_INLINE ExprValueNull(int expr_idx) const {
+    return static_cast<bool>(*expr_values_cache_.ExprValueNullPtr(expr_idx));
   }
 
-  /// Evaluate and hash the build/probe row, returning in *hash. Returns false if this
-  /// row should be rejected (doesn't need to be processed further) because it
-  /// contains NULL.
-  /// These need to be inlined in the IR module so we can find and replace the calls to
-  /// EvalBuildRow()/EvalProbeRow().
-  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash);
-  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash);
-
-  int ALWAYS_INLINE results_buffer_size() const { return results_buffer_size_; }
+  /// Evaluate and hash the build/probe row, saving the evaluation to the current row of
+  /// the ExprValuesCache in this hash table context: the results are saved in
+  /// 'cur_expr_values_', the nullness of expressions values in 'cur_expr_values_null_',
+  /// and the hashed expression values in 'cur_expr_values_hash_'. Returns false if this
+  /// row should be rejected  (doesn't need to be processed further) because it contains
+  /// NULL. These need to be inlined in the IR module so we can find and replace the
+  /// calls to EvalBuildRow()/EvalProbeRow().
+  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row);
+  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row);
 
   /// Codegen for evaluating a tuple row.  Codegen'd function matches the signature
   /// for EvalBuildRow and EvalTupleRow.
@@ -165,13 +177,13 @@ class HashTableCtx {
   Status CodegenEvalRow(RuntimeState* state, bool build_row, llvm::Function** fn);
 
   /// Codegen for evaluating a TupleRow and comparing equality against
-  /// 'expr_values_buffer_'.  Function signature matches HashTable::Equals().
+  /// 'cur_expr_values_'.  Function signature matches HashTable::Equals().
   /// 'force_null_equality' is true if the generated equality function should treat
   /// all NULLs as equal. See the template parameter to HashTable::Equals().
   Status CodegenEquals(RuntimeState* state, bool force_null_equality,
       llvm::Function** fn);
 
-  /// Codegen for hashing the expr values in 'expr_values_buffer_'. Function prototype
+  /// Codegen for hashing the expr values in 'cur_expr_values_'. Function prototype
   /// matches HashCurrentRow identically. Unlike HashCurrentRow(), the returned function
   /// only uses a single hash function, rather than switching based on level_.
   /// If 'use_murmur' is true, murmur hash is used, otherwise CRC is used if the hardware
@@ -180,36 +192,198 @@ class HashTableCtx {
 
   static const char* LLVM_CLASS_NAME;
 
+  /// To enable prefetching, the hash table building and probing are pipelined by the
+  /// exec nodes. A set of rows in a row batch will be evaluated and hashed first and
+  /// the corresponding hash table buckets are prefetched before they are probed against
+  /// the hash table. ExprValuesCache is a container for caching the results of
+  /// expressions evaluations for the rows in a prefetch set to avoid re-evaluating the
+  /// rows again during probing. Expressions evaluation can be very expensive.
+  ///
+  /// The expression evaluation results are cached in the following data structures:
+  ///
+  /// - 'expr_values_array_' is an array caching the results of the rows
+  /// evaluated against either the build or probe expressions. 'cur_expr_values_'
+  /// is a pointer into this array.
+  /// - 'expr_values_null_array_' is an array caching the nullness of each evaluated
+  /// expression in each row. 'cur_expr_values_null_' is a pointer into this array.
+  /// - 'expr_values_hash_array_' is an array of cached hash values of the rows.
+  /// 'cur_expr_values_hash_' is a pointer into this array.
+  /// - 'null_bitmap_' is a bitmap which indicates rows evaluated to NULL.
+  ///
+  /// ExprValuesCache provides an iterator like interface for performing a write pass
+  /// followed by a read pass. We refrain from providing an interface for random accesses
+  /// as there isn't a use case for it now and we want to avoid expensive multiplication
+  /// as the buffer size of each row is not necessarily power of two:
+  /// - Reset(), ResetForRead(): reset the iterators before writing / reading cached values.
+  /// - NextRow(): moves the iterators to point to the next row of cached values.
+  /// - AtEnd(): returns true if all cached rows have been read. Valid in read mode only.
+  ///
+  /// Various metadata information such as layout of results buffer is also stored in
+  /// this class. Note that the result buffer doesn't store variable length data. It only
+  /// contains pointers to the variable length data (e.g. if an expression value is a
+  /// StringValue).
+  ///
+  class ExprValuesCache {
+   public:
+    ExprValuesCache();
+
+    /// Allocates memory and initializes various data structures. Return error status
+    /// if memory allocation leads to the memory limits of the exec node to be exceeded.
+    /// 'tracker' is the memory tracker of the exec node which owns this HashTableCtx.
+    Status Init(RuntimeState* state, MemTracker* tracker,
+        const std::vector<ExprContext*>& build_expr_ctxs);
+
+    /// Frees up various resources and updates memory tracker with proper accounting.
+    /// 'tracker' should be the same memory tracker which was passed in for Init().
+    void Close(MemTracker* tracker);
+
+    /// Resets the cache states (iterators, end pointers etc) before writing.
+    void Reset();
+
+    /// Resets the iterators to the start before reading. Will record the current position
+    /// of the iterators in end pointer before resetting so AtEnd() can determine if all
+    /// cached values have been read.
+    void ResetForRead();
+
+    /// Advances the iterators to the next row by moving to the next entries in the
+    /// arrays of cached values.
+    void ALWAYS_INLINE NextRow();
+
+    /// Compute the total memory usage of this ExprValuesCache.
+    static int MemUsage(int capacity, int results_buffer_size, int num_build_exprs);
+
+    /// Returns the maximum number rows of expression values states which can be cached.
+    int ALWAYS_INLINE capacity() const { return capacity_; }
+
+    /// Returns the total size in bytes of a row of evaluated expressions' values.
+    int ALWAYS_INLINE expr_values_bytes_per_row() const {
+      return expr_values_bytes_per_row_;
+    }
+
+    /// Returns the offset into the result buffer of the first variable length
+    /// data results.
+    int ALWAYS_INLINE var_result_offset() const { return var_result_offset_; }
+
+    /// Returns true if the current read pass is complete, meaning all cached values
+    /// have been read.
+    bool ALWAYS_INLINE AtEnd() const {
+      return cur_expr_values_hash_ == cur_expr_values_hash_end_;
+    }
+
+    /// Returns true if the current row is null but nulls are not considered in the current
+    /// phase (build or probe).
+    bool ALWAYS_INLINE IsRowNull() const { return null_bitmap_.Get<false>(CurIdx()); }
+
+    /// Record in a bitmap that the current row is null but nulls are not considered in
+    /// the current phase (build or probe).
+    void ALWAYS_INLINE SetRowNull() { null_bitmap_.Set<false>(CurIdx(), true); }
+
+    /// Returns the hash values of the current row.
+    uint32_t ALWAYS_INLINE ExprValuesHash() const { return *cur_expr_values_hash_; }
+
+    /// Sets the hash values for the current row.
+    void ALWAYS_INLINE SetExprValuesHash(uint32_t hash) { *cur_expr_values_hash_ = hash; }
+
+    /// Returns a pointer to the expression value at 'expr_idx' for the current row.
+    uint8_t* ExprValuePtr(int expr_idx) const;
+
+    /// Returns a pointer to the boolean indicating the nullness of the expression value
+    /// at 'expr_idx'.
+    uint8_t* ExprValueNullPtr(int expr_idx) const;
+
+    /// Returns the offset into the results buffer of the expression value at 'expr_idx'.
+    int ALWAYS_INLINE expr_values_offsets(int expr_idx) const {
+      return expr_values_offsets_[expr_idx];
+    }
+
+   private:
+    friend class HashTableCtx;
+
+    /// Resets the iterators to the beginning of the cache values' arrays.
+    void ResetIterators();
+
+    /// Returns the offset in number of rows into the cached values' buffer.
+    int ALWAYS_INLINE CurIdx() const {
+      return cur_expr_values_hash_ - expr_values_hash_array_.get();
+    }
+
+    /// Max amount of memory in bytes for caching evaluated expression values.
+    static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
+
+    /// Maximum number of rows of expressions evaluation states which this
+    /// ExprValuesCache can cache.
+    int capacity_;
+
+    /// Byte size of a row of evaluated expression values. Never changes once set,
+    /// can be used for constant substitution during codegen.
+    int expr_values_bytes_per_row_;
+
+    /// Number of build/probe expressions.
+    int num_exprs_;
+
+    /// Pointer into 'expr_values_array_' for the current row's expression values.
+    uint8_t* cur_expr_values_;
+
+    /// Pointer into 'expr_values_null_array_' for the current row's nullness of each
+    /// expression value.
+    uint8_t* cur_expr_values_null_;
+
+    /// Pointer into 'expr_hash_value_array_' for the hash value of current row's
+    /// expression values.
+    uint32_t* cur_expr_values_hash_;
+
+    /// Pointer to the buffer one beyond the end of the last entry of cached expressions'
+    /// hash values.
+    uint32_t* cur_expr_values_hash_end_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of evaluated expression
+    /// values. Each row consumes 'expr_values_bytes_per_row_' number of bytes.
+    boost::scoped_array<uint8_t> expr_values_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of null booleans.
+    /// Each row contains 'num_exprs_' booleans to indicate nullness of expression values.
+    /// Used when the hash table supports NULL. Use 'uint8_t' to guarantee each entry is 1
+    /// byte as sizeof(bool) is implementation dependent. The IR depends on this
+    /// assumption.
+    boost::scoped_array<uint8_t> expr_values_null_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of hashed values.
+    boost::scoped_array<uint32_t> expr_values_hash_array_;
+
+    /// One bit for each row. A bit is set if that row is not hashed as it's evaluated
+    /// to NULL but the hash table doesn't support NULL. Such rows may still be included
+    /// in outputs for certain join types (e.g. left anti joins).
+    Bitmap null_bitmap_;
+
+    /// Maps from expression index to the byte offset into a row of expression values.
+    /// One entry per build/probe expression.
+    std::vector<int> expr_values_offsets_;
+
+    /// Byte offset into 'cur_expr_values_' that begins the variable length results for
+    /// a row. If -1, there are no variable length slots. Never changes once set, can be
+    /// constant substituted with codegen.
+    int var_result_offset_;
+  };
+
+  ExprValuesCache* ALWAYS_INLINE expr_values_cache() { return &expr_values_cache_; }
+
  private:
   friend class HashTable;
   friend class HashTableTest_HashEmpty_Test;
 
+  /// Allocate various buffers for storing expression evaluation results, hash values,
+  /// null bits etc. Returns error if allocation causes query memory limit to be exceeded.
+  Status Init(RuntimeState* state, int num_build_tuples);
+
   /// Compute the hash of the values in expr_values_buffer_.
   /// This will be replaced by codegen.  We don't want this inlined for replacing
   /// with codegen'd functions so the function name does not change.
-  uint32_t IR_NO_INLINE HashCurrentRow() const {
-    DCHECK_LT(level_, seeds_.size());
-    if (var_result_begin_ == -1) {
-      /// This handles NULLs implicitly since a constant seed value was put
-      /// into results buffer for nulls.
-      /// TODO: figure out which hash function to use. We need to generate uncorrelated
-      /// hashes by changing just the seed. CRC does not have this property and FNV is
-      /// okay. We should switch to something else.
-      return Hash(expr_values_buffer_, results_buffer_size_, seeds_[level_]);
-    } else {
-      return HashTableCtx::HashVariableLenRow();
-    }
-  }
+  uint32_t IR_NO_INLINE HashCurrentRow() const;
 
   /// Wrapper function for calling correct HashUtil function in non-codegen'd case.
-  uint32_t inline Hash(const void* input, int len, uint32_t hash) const {
-    /// Use CRC hash at first level for better performance. Switch to murmur hash at
-    /// subsequent levels since CRC doesn't randomize well with different seed inputs.
-    if (level_ == 0) return HashUtil::Hash(input, len, hash);
-    return HashUtil::MurmurHash2_64(input, len, hash);
-  }
+  uint32_t Hash(const void* input, int len, uint32_t hash) const;
 
-  /// Evaluate 'row' over build exprs caching the results in 'expr_values_buffer_' This
+  /// Evaluate 'row' over build exprs caching the results in 'cur_expr_values_' This
   /// will be replaced by codegen.  We do not want this function inlined when cross
   /// compiled because we need to be able to differentiate between EvalBuildRow and
   /// EvalProbeRow by name and the build/probe exprs are baked into the codegen'd
@@ -218,7 +392,7 @@ class HashTableCtx {
     return EvalRow(row, build_expr_ctxs_);
   }
 
-  /// Evaluate 'row' over probe exprs caching the results in 'expr_values_buffer_'
+  /// Evaluate 'row' over probe exprs caching the results in 'cur_expr_values_'
   /// This will be replaced by codegen.
   bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
     return EvalRow(row, probe_expr_ctxs_);
@@ -228,15 +402,15 @@ class HashTableCtx {
   /// fields (e.g. strings).
   uint32_t HashVariableLenRow() const;
 
-  /// Evaluate the exprs over row and cache the results in 'expr_values_buffer_'.
+  /// Evaluate the exprs over row and cache the results in 'cur_expr_values_'.
   /// Returns whether any expr evaluated to NULL.
   /// This will be replaced by codegen.
   bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs);
 
   /// Returns true if the values of build_exprs evaluated over 'build_row' equal the
-  /// values cached in 'expr_values_buffer_'.  This will be replaced by codegen.
+  /// values cached in 'cur_expr_values_'.  This will be replaced by codegen.
   /// FORCE_NULL_EQUALITY is true if all nulls should be treated as equal, regardless
-  /// of the values of finds_nulls_
+  /// of the values of 'finds_nulls_'.
   template<bool FORCE_NULL_EQUALITY>
   bool IR_NO_INLINE Equals(TupleRow* build_row) const;
 
@@ -263,29 +437,16 @@ class HashTableCtx {
   /// The seeds to use for hashing. Indexed by the level.
   std::vector<uint32_t> seeds_;
 
-  /// Cache of exprs values for the current row being evaluated.  This can either
-  /// be a build row (during Insert()) or probe row (during FindProbeRow()).
-  std::vector<int> expr_values_buffer_offsets_;
-
-  /// Byte offset into 'expr_values_buffer_' that begins the variable length results.
-  /// If -1, there are no variable length slots. Never changes once set, can be removed
-  /// with codegen.
-  int var_result_begin_;
-
-  /// Byte size of 'expr_values_buffer_'. Never changes once set, can be removed with
-  /// codegen.
-  int results_buffer_size_;
-
-  /// Buffer to store evaluated expr results.  This address must not change once
-  /// allocated since the address is baked into the codegen.
-  uint8_t* expr_values_buffer_;
-
-  /// Use bytes instead of bools to be compatible with llvm.  This address must
-  /// not change once allocated.
-  uint8_t* expr_value_null_bits_;
+  /// The ExprValuesCache for caching expression evaluation results, null bytes and hash
+  /// values for rows. Used to store results of batch evaluations of rows.
+  ExprValuesCache expr_values_cache_;
 
   /// Scratch buffer to generate rows on the fly.
   TupleRow* scratch_row_;
+
+  /// Memory tracker of the exec node which owns this hash table context. Account the
+  /// memory usage of expression values cache towards it.
+  MemTracker* tracker_;
 };
 
 /// The hash table consists of a contiguous array of buckets that contain a pointer to the
@@ -381,26 +542,27 @@ class HashTable {
   /// the insert fails and this function returns false.
   /// Used during the build phase of hash joins.
   bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
-      const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash);
+      const BufferedTupleStream::RowIdx& idx, TupleRow* row);
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
   template<const bool READ>
   void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
 
-  /// Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'.
-  /// Returns HashTable::End() if no match is found. The iterator can be iterated until
-  /// HashTable::End() to find all the matching rows. Advancing the returned iterator will
-  /// go to the next matching row. The matching rows do not need to be evaluated since all
-  /// the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'.
-  /// Used during the probe phase of hash joins.
-  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash);
+  /// Returns an iterator to the bucket that matches the probe expression results that
+  /// are cached at the current position of the ExprValuesCache in 'ht_ctx'. Assumes that
+  /// the ExprValuesCache was filled using EvalAndHashProbe(). Returns HashTable::End()
+  /// if no match is found. The iterator can be iterated until HashTable::End() to find
+  /// all the matching rows. Advancing the returned iterator will go to the next matching
+  /// row. The matching rows do not need to be evaluated since all the nodes of a bucket
+  /// are duplicates. One scan can be in progress for each 'ht_ctx'. Used in the probe
+  /// phase of hash joins.
+  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx);
 
   /// If a match is found in the table, return an iterator as in FindProbeRow(). If a
   /// match was not present, return an iterator pointing to the empty bucket where the key
   /// should be inserted. Returns End() if the table is full. The caller can set the data
   /// in the bucket using a Set*() method on the iterator.
-  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, uint32_t hash,
-      bool* found);
+  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, bool* found);
 
   /// Returns number of elements inserted in the hash table
   int64_t size() const {
@@ -531,6 +693,10 @@ class HashTable {
     /// Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
     bool ALWAYS_INLINE AtEnd() const { return bucket_idx_ == BUCKET_NOT_FOUND; }
 
+    /// Prefetch the hash table bucket which the iterator is pointing to now.
+    template<const bool READ>
+    void IR_ALWAYS_INLINE PrefetchBucket();
+
    private:
     friend class HashTable;
 
@@ -579,28 +745,24 @@ class HashTable {
   /// Using the returned index value, the caller can create an iterator that can be
   /// iterated until End() to find all the matching rows.
   ///
-  /// If 'row' is not NULL, 'row' will be evaluated once against either the build or
-  /// probe exprs (determined by the parameter 'is_build') before calling Equals().
-  /// If 'row' is NULL, EvalAndHashBuild() or EvalAndHashProbe() must have been called
-  /// before calling this function.
+  /// EvalAndHashBuild() or EvalAndHashProbe() must have been called before calling
+  /// this function. The values of the expression values cache in 'ht_ctx' will be
+  /// used to probe the hash table.
   ///
   /// 'FORCE_NULL_EQUALITY' is true if NULLs should always be considered equal when
   /// comparing two rows.
   ///
-  /// 'is_build' indicates which of build or probe exprs is used for lazy evaluation.
-  /// 'row' is the row being probed against the hash table. Used for lazy evaluation.
   /// 'hash' is the hash computed by EvalAndHashBuild() or EvalAndHashProbe().
   /// 'found' indicates that a bucket that contains an equal row is found.
   ///
   /// There are wrappers of this function that perform the Find and Insert logic.
   template <bool FORCE_NULL_EQUALITY>
-  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets, bool is_build,
-      HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found);
+  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets,
+      HashTableCtx* ht_ctx, uint32_t hash, bool* found);
 
   /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node
   /// where the data should be inserted. Returns NULL if the insert was not successful.
-  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, TupleRow* row,
-      uint32_t hash);
+  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx);
 
   /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
   /// duplicates, 'node' will be pointing to the head of the linked list of duplicates.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index 0d4b1b6..6d33869 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -23,23 +23,30 @@
 
 namespace impala {
 
-inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row) {
   bool has_null = EvalBuildRow(row);
   if (!stores_nulls_ && has_null) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
-inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row) {
   bool has_null = EvalProbeRow(row);
   if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
+inline void HashTableCtx::ExprValuesCache::NextRow() {
+  cur_expr_values_ += expr_values_bytes_per_row_;
+  cur_expr_values_null_ += num_exprs_;
+  ++cur_expr_values_hash_;
+  DCHECK_LE(cur_expr_values_hash_ - expr_values_hash_array_.get(), capacity_);
+}
+
 template <bool FORCE_NULL_EQUALITY>
 inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
-    bool is_build, HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found) {
+    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
   DCHECK(buckets != NULL);
   DCHECK_GT(num_buckets, 0);
   *found = false;
@@ -49,20 +56,10 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
   // for knowing when to exit the loop (e.g. by capping the total travel length). In case
   // of quadratic probing it is also used for calculating the length of the next jump.
   int64_t step = 0;
-  bool need_eval = row != NULL;
   do {
     Bucket* bucket = &buckets[bucket_idx];
     if (LIKELY(!bucket->filled)) return bucket_idx;
     if (hash == bucket->hash) {
-      // Evaluate 'row' if needed before calling Equals() for the first time in this loop.
-      if (need_eval) {
-        if (is_build) {
-          ht_ctx->EvalBuildRow(row);
-        } else {
-          ht_ctx->EvalProbeRow(row);
-        }
-        need_eval = false;
-      }
       if (ht_ctx != NULL &&
           ht_ctx->Equals<FORCE_NULL_EQUALITY>(GetRow(bucket, ht_ctx->scratch_row_))) {
         *found = true;
@@ -89,12 +86,11 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
   return Iterator::BUCKET_NOT_FOUND;
 }
 
-inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
-    TupleRow* row, uint32_t hash) {
+inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, true, ht_ctx, row, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, &found);
   DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
   if (found) {
     // We need to insert a duplicate node, note that this may fail to allocate memory.
@@ -108,8 +104,8 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
 }
 
 inline bool HashTable::Insert(HashTableCtx* ht_ctx,
-    const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash) {
-  HtData* htdata = InsertInternal(ht_ctx, row, hash);
+    const BufferedTupleStream::RowIdx& idx, TupleRow* row) {
+  HtData* htdata = InsertInternal(ht_ctx);
   // If successful insert, update the contents of the newly inserted entry with 'idx'.
   if (LIKELY(htdata != NULL)) {
     if (stores_tuples_) {
@@ -133,11 +129,11 @@ inline void HashTable::PrefetchBucket(uint32_t hash) {
   __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
 }
 
-inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash) {
+inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<false>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash, &found);
   if (found) {
     return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
         buckets_[bucket_idx].bucketData.duplicates);
@@ -147,10 +143,10 @@ inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx, uint32_
 
 // TODO: support lazy evaluation like HashTable::Insert().
 inline HashTable::Iterator HashTable::FindBuildRowBucket(
-    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+    HashTableCtx* ht_ctx, bool* found) {
   ++num_probes_;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, found);
   DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ?
       buckets_[bucket_idx].bucketData.duplicates : NULL;
   return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates);
@@ -318,6 +314,16 @@ inline void HashTable::Iterator::SetAtEnd() {
   node_ = NULL;
 }
 
+template<const bool READ>
+inline void HashTable::Iterator::PrefetchBucket() {
+  if (LIKELY(!AtEnd())) {
+    // HashTable::PrefetchBucket() takes a hash value to index into the hash bucket
+    // array. Passing 'bucket_idx_' here is sufficient.
+    DCHECK_EQ((bucket_idx_ & ~(table_->num_buckets_ - 1)), 0);
+    table_->PrefetchBucket<READ>(bucket_idx_);
+  }
+}
+
 inline void HashTable::Iterator::Next() {
   DCHECK(!AtEnd());
   if (table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) {