You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/27 21:38:47 UTC

[1/2] incubator-impala git commit: IMPALA-4104: add DCHECK to ConsumeLocal() and fix tests

Repository: incubator-impala
Updated Branches:
  refs/heads/master f5c7668f9 -> 6cc296ec8


IMPALA-4104: add DCHECK to ConsumeLocal() and fix tests

The TestEnv used for the backend tests does not connect up the
MemTracker hierarchy in the expected way. This caused the valid
DCHECK in ConsumeLocal() to be triggered in backend tests.

This change fixes TestEnv to set up MemTrackers with the normal
hierarchy, as shown below, and fixes the tests to deal with the fallout
of that.

(Process)
 |
(Query)----------
 |              |
(Block Mgr) (Fragment instance)

Change-Id: Iadcbe96a9f1bf19872436211b049cebf39b0afe7
Reviewed-on: http://gerrit.cloudera.org:8080/4531
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4849e586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4849e586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4849e586

Branch: refs/heads/master
Commit: 4849e5868e959f87db7a874a77c7f2b5711fc7b1
Parents: f5c7668
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Sep 8 21:09:53 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Sep 27 19:57:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc               |  10 +-
 be/src/runtime/buffered-block-mgr-test.cc    | 217 +++++++++++-----------
 be/src/runtime/buffered-block-mgr.h          |   1 +
 be/src/runtime/buffered-tuple-stream-test.cc |   6 +-
 be/src/runtime/mem-tracker.h                 |   1 +
 be/src/runtime/test-env.cc                   |  14 +-
 be/src/runtime/test-env.h                    |   5 +-
 7 files changed, 135 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 1066d2c..d903420 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -184,11 +184,13 @@ class HashTableTest : public testing::Test {
   bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
       scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
       int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_TRUE(test_env_->CreateQueryState(0, max_num_blocks, block_size,
-        &runtime_state_).ok());
+    EXPECT_OK(
+        test_env_->CreateQueryState(0, max_num_blocks, block_size, &runtime_state_));
+    MemTracker* client_tracker = pool_.Add(
+        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     BufferedBlockMgr::Client* client;
-    EXPECT_TRUE(runtime_state_->block_mgr()->RegisterClient("", reserved_blocks, false,
-        &tracker_, runtime_state_, &client).ok());
+    EXPECT_OK(runtime_state_->block_mgr()->RegisterClient(
+        "", reserved_blocks, false, client_tracker, runtime_state_, &client));
 
     // Initial_num_buckets must be a power of two.
     EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 5eb1f8c..ae822bb 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -15,27 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
 #include <gutil/strings/substitute.h>
 #include <sys/stat.h>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/filesystem.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
 
-#include "testutil/gtest-util.h"
-#include "common/init.h"
 #include "codegen/llvm-codegen.h"
-#include "runtime/disk-io-mgr.h"
+#include "common/init.h"
+#include "common/object-pool.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
-#include "util/disk-info.h"
+#include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
+#include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/promise.h"
 #include "util/test-info.h"
@@ -73,13 +74,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
-    client_tracker_.reset(new MemTracker(-1));
   }
 
   virtual void TearDown() {
     TearDownMgrs();
     test_env_.reset();
-    client_tracker_.reset();
 
     // Tests modify permissions, so make sure we can delete if they didn't clean up.
     for (int i = 0; i < created_tmp_dirs_.size(); ++i) {
@@ -87,6 +86,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     }
     FileSystemUtil::RemovePaths(created_tmp_dirs_);
     created_tmp_dirs_.clear();
+    pool_.Clear();
   }
 
   /// Reinitialize test_env_ to have multiple temporary directories.
@@ -106,8 +106,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
   }
 
   static void ValidateBlock(BufferedBlockMgr::Block* block, int32_t data) {
-    EXPECT_TRUE(block->valid_data_len() == sizeof(int32_t));
-    EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->buffer()) == data);
+    EXPECT_EQ(block->valid_data_len(), sizeof(int32_t));
+    EXPECT_EQ(*reinterpret_cast<int32_t*>(block->buffer()), data);
   }
 
   static int32_t* MakeRandomSizeData(BufferedBlockMgr::Block* block) {
@@ -149,15 +149,22 @@ class BufferedBlockMgrTest : public ::testing::Test {
     return state->block_mgr();
   }
 
+  /// Create a new client tracker as a child of the RuntimeState's instance tracker.
+  MemTracker* NewClientTracker(RuntimeState* state) {
+    return pool_.Add(new MemTracker(-1, "client", state->instance_mem_tracker()));
+  }
+
   BufferedBlockMgr* CreateMgrAndClient(int64_t query_id, int max_buffers, int block_size,
-      int reserved_blocks, bool tolerates_oversubscription, MemTracker* tracker,
+      int reserved_blocks, bool tolerates_oversubscription,
       BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL,
       TQueryOptions* query_options = NULL) {
     RuntimeState* state;
     BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, &state,
         query_options);
-    EXPECT_TRUE(mgr->RegisterClient(Substitute("Client for query $0", query_id),
-        reserved_blocks, tolerates_oversubscription, tracker, state, client).ok());
+
+    MemTracker* client_tracker = NewClientTracker(state);
+    EXPECT_OK(mgr->RegisterClient(Substitute("Client for query $0", query_id),
+        reserved_blocks, tolerates_oversubscription, client_tracker, state, client));
     EXPECT_TRUE(client != NULL);
     if (query_state != NULL) *query_state = state;
     return mgr;
@@ -165,13 +172,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   void CreateMgrsAndClients(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
       int block_size, int reserved_blocks_per_client, bool tolerates_oversubscription,
-      MemTracker* tracker, vector<BufferedBlockMgr*>* mgrs,
-      vector<BufferedBlockMgr::Client*>* clients) {
+      vector<BufferedBlockMgr*>* mgrs, vector<BufferedBlockMgr::Client*>* clients) {
     for (int i = 0; i < num_mgrs; ++i) {
       BufferedBlockMgr::Client* client;
       BufferedBlockMgr* mgr = CreateMgrAndClient(start_query_id + i, buffers_per_mgr,
-          block_size_, reserved_blocks_per_client, tolerates_oversubscription,
-          tracker, &client);
+          block_size_, reserved_blocks_per_client, tolerates_oversubscription, &client);
       mgrs->push_back(mgr);
       clients->push_back(client);
     }
@@ -179,9 +184,9 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   // Destroy all created query states and associated block managers.
   void TearDownMgrs() {
-    // Freeing all block managers should clean up all consumed memory.
+    // Tear down the query states, which DCHECKs that the memory consumption of
+    // the query's trackers is zero.
     test_env_->TearDownQueryStates();
-    EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
   }
 
   void AllocateBlocks(BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
@@ -292,9 +297,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     vector<BufferedBlockMgr::Block*> blocks;
     BufferedBlockMgr* block_mgr;
     BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-        client_tracker_.get(), &client);
-    EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
+    block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, &client);
+    EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
 
     // Allocate blocks until max_num_blocks, they should all succeed and memory
     // usage should go up.
@@ -317,7 +321,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     uint8_t* old_buffer = first_block->buffer();
     status = block_mgr->GetNewBlock(client, first_block, &new_block);
     EXPECT_TRUE(new_block != NULL);
-    EXPECT_TRUE(old_buffer == new_block->buffer());
+    EXPECT_EQ(old_buffer, new_block->buffer());
     EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
     EXPECT_TRUE(!first_block->is_pinned());
     blocks.push_back(new_block);
@@ -338,8 +342,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     int max_num_buffers = 5;
     BufferedBlockMgr* block_mgr;
     BufferedBlockMgr::Client* client;
-    block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-        client_tracker_.get(), &client);
+    block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, &client);
 
     // Check counters.
     RuntimeProfile* profile = block_mgr->profile();
@@ -398,8 +401,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     ApiFunction api_function;
 
     BufferedBlockMgr::Client* client;
-    EXPECT_OK(block_mgr->RegisterClient("", 0, false, client_tracker_.get(), state,
-        &client));
+    EXPECT_OK(
+        block_mgr->RegisterClient("", 0, false, NewClientTracker(state), state, &client));
     EXPECT_TRUE(client != NULL);
 
     pinned_blocks.reserve(num_buffers);
@@ -543,9 +546,9 @@ class BufferedBlockMgrTest : public ::testing::Test {
     const int iters = 10000;
     for (int i = 0; i < iters; ++i) {
       shared_ptr<BufferedBlockMgr> mgr;
-      Status status = BufferedBlockMgr::Create(state,
-          test_env_->block_mgr_parent_tracker(), state->runtime_profile(),
-          test_env_->tmp_file_mgr(), block_size_ * num_buffers, block_size_, &mgr);
+      Status status = BufferedBlockMgr::Create(state, state->query_mem_tracker(),
+          state->runtime_profile(), test_env_->tmp_file_mgr(), block_size_ * num_buffers,
+          block_size_, &mgr);
     }
   }
 
@@ -557,9 +560,11 @@ class BufferedBlockMgrTest : public ::testing::Test {
     // Create a shared RuntimeState with no BufferedBlockMgr.
     RuntimeState* shared_state =
         new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
+    shared_state->InitMemTrackers(TUniqueId(), NULL, -1);
+
     for (int i = 0; i < num_threads; ++i) {
-      thread* t = new thread(bind(
-          &BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));
+      thread* t = new thread(
+          bind(&BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));
       workers.add_thread(t);
     }
     workers.join_all();
@@ -578,7 +583,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   void TestRuntimeStateTeardown(bool write_error, bool wait_for_writes);
 
   scoped_ptr<TestEnv> test_env_;
-  scoped_ptr<MemTracker> client_tracker_;
+  ObjectPool pool_;
   vector<string> created_tmp_dirs_;
 };
 
@@ -593,9 +598,9 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   int max_num_blocks = 3;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-      client_tracker_.get(), &client);
-  EXPECT_EQ(0, test_env_->block_mgr_parent_tracker()->consumption());
+  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, &client);
+  MemTracker* client_tracker = block_mgr->get_tracker(client);
+  EXPECT_EQ(0, test_env_->TotalQueryMemoryConsumption());
 
   vector<BufferedBlockMgr::Block*> blocks;
 
@@ -604,8 +609,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 128));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), 0);
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(), 0);
-  EXPECT_EQ(client_tracker_->consumption(), 128);
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), 0);
+  EXPECT_EQ(client_tracker->consumption(), 128);
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), 128);
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -615,9 +620,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(),
-            block_mgr->max_block_size());
-  EXPECT_EQ(client_tracker_->consumption(), 128 + block_mgr->max_block_size());
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size());
+  EXPECT_EQ(client_tracker->consumption(), 128 + block_mgr->max_block_size());
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size());
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -627,9 +631,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, 512));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
-  EXPECT_EQ(test_env_->block_mgr_parent_tracker()->consumption(),
-            block_mgr->max_block_size());
-  EXPECT_EQ(client_tracker_->consumption(), 128 + 512 + block_mgr->max_block_size());
+  EXPECT_EQ(block_mgr->mem_tracker()->consumption(), block_mgr->max_block_size());
+  EXPECT_EQ(client_tracker->consumption(), 128 + 512 + block_mgr->max_block_size());
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_EQ(new_block->BytesRemaining(), 512);
   EXPECT_TRUE(new_block->buffer() != NULL);
@@ -652,8 +655,7 @@ TEST_F(BufferedBlockMgrTest, Pin) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, false, &client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
@@ -703,8 +705,7 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, &client);
 
   // Check counters.
   RuntimeProfile* profile = block_mgr->profile();
@@ -713,13 +714,13 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
+  EXPECT_EQ(created_cnt->value(), max_num_buffers);
 
   DeleteBlocks(blocks);
   blocks.clear();
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
-  EXPECT_TRUE(created_cnt->value() == max_num_buffers);
-  EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);
+  EXPECT_EQ(created_cnt->value(), max_num_buffers);
+  EXPECT_EQ(recycled_cnt->value(), max_num_buffers);
 
   DeleteBlocks(blocks);
   TearDownMgrs();
@@ -730,8 +731,9 @@ TEST_F(BufferedBlockMgrTest, Deletion) {
 TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   int max_num_buffers = 16;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
+  MemTracker* client_tracker = block_mgr->get_tracker(client);
 
   // Pinned I/O block.
   BufferedBlockMgr::Block* new_block;
@@ -740,16 +742,16 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_TRUE(new_block->is_pinned());
   EXPECT_TRUE(new_block->is_max_size());
   new_block->Delete();
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(0, client_tracker->consumption());
 
   // Pinned non-I/O block.
   int small_block_size = 128;
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block, small_block_size));
   EXPECT_TRUE(new_block != NULL);
   EXPECT_TRUE(new_block->is_pinned());
-  EXPECT_EQ(small_block_size, client_tracker_->consumption());
+  EXPECT_EQ(small_block_size, client_tracker->consumption());
   new_block->Delete();
-  EXPECT_EQ(0, client_tracker_->consumption());
+  EXPECT_EQ(0, client_tracker->consumption());
 
   // Unpinned I/O block - delete after written to disk.
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
@@ -760,7 +762,7 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_FALSE(new_block->is_pinned());
   WaitForWrites(block_mgr);
   new_block->Delete();
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(client_tracker->consumption(), 0);
 
   // Unpinned I/O block - delete before written to disk.
   EXPECT_OK(block_mgr->GetNewBlock(client, NULL, &new_block));
@@ -771,7 +773,7 @@ TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
   EXPECT_FALSE(new_block->is_pinned());
   new_block->Delete();
   WaitForWrites(block_mgr);
-  EXPECT_TRUE(client_tracker_->consumption() == 0);
+  EXPECT_EQ(client_tracker->consumption(), 0);
 
   TearDownMgrs();
 }
@@ -792,8 +794,8 @@ TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
   const int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
   RuntimeState* query_state;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      1, false, client_tracker_.get(), &client, &query_state);
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
+      0, max_num_buffers, block_size_, 1, false, &client, &query_state);
 
   for (int trial = 0; trial < trials; ++trial) {
     for (int delay_ms = 0; delay_ms <= 10; delay_ms += 5) {
@@ -811,8 +813,8 @@ TEST_F(BufferedBlockMgrTest, TransferBufferDuringWrite) {
       // number of buffers.
       int reserved_buffers = trial % max_num_buffers;
       BufferedBlockMgr::Client* tmp_client;
-      EXPECT_TRUE(block_mgr->RegisterClient("tmp_client", reserved_buffers, false,
-          client_tracker_.get(), query_state, &tmp_client).ok());
+      EXPECT_OK(block_mgr->RegisterClient("tmp_client", reserved_buffers, false,
+          NewClientTracker(query_state), query_state, &tmp_client));
       BufferedBlockMgr::Block* tmp_block;
       ASSERT_OK(block_mgr->GetNewBlock(tmp_client, NULL, &tmp_block));
 
@@ -838,8 +840,7 @@ TEST_F(BufferedBlockMgrTest, Close) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, &client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -866,8 +867,8 @@ TEST_F(BufferedBlockMgrTest, DestructDuringWrite) {
 
   for (int trial = 0; trial < trials; ++trial) {
     BufferedBlockMgr::Client* client;
-    BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-        0, false, client_tracker_.get(), &client);
+    BufferedBlockMgr* block_mgr =
+        CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
 
     vector<BufferedBlockMgr::Block*> blocks;
     AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -884,20 +885,23 @@ TEST_F(BufferedBlockMgrTest, DestructDuringWrite) {
   // Destroying test environment will check that all writes have completed.
 }
 
-void BufferedBlockMgrTest::TestRuntimeStateTeardown(bool write_error,
-    bool wait_for_writes) {
+void BufferedBlockMgrTest::TestRuntimeStateTeardown(
+    bool write_error, bool wait_for_writes) {
   const int max_num_buffers = 10;
   RuntimeState* state;
   BufferedBlockMgr::Client* client;
-  CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, client_tracker_.get(),
-      &client, &state);
+  CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, &state);
 
-  // Hold another reference to block mgr so that it can outlive runtime state.
+  // Hold extra references to block mgr and query mem tracker so they can outlive runtime
+  // state.
   shared_ptr<BufferedBlockMgr> block_mgr;
-  Status status = BufferedBlockMgr::Create(state, test_env_->block_mgr_parent_tracker(),
+  shared_ptr<MemTracker> query_mem_tracker;
+  Status status = BufferedBlockMgr::Create(state, state->query_mem_tracker(),
       state->runtime_profile(), test_env_->tmp_file_mgr(), 0, block_size_, &block_mgr);
   ASSERT_TRUE(status.ok());
   ASSERT_TRUE(block_mgr != NULL);
+  query_mem_tracker = MemTracker::GetQueryMemTracker(
+      state->query_id(), -1, test_env_->exec_env()->process_mem_tracker());
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr.get(), client, max_num_buffers, &blocks);
@@ -927,7 +931,7 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(bool write_error,
   if (wait_for_writes) WaitForWrites(block_mgr.get());
   block_mgr.reset();
 
-  EXPECT_TRUE(test_env_->block_mgr_parent_tracker()->consumption() == 0);
+  EXPECT_EQ(test_env_->TotalQueryMemoryConsumption(), 0);
 }
 
 TEST_F(BufferedBlockMgrTest, RuntimeStateTeardown) {
@@ -947,8 +951,8 @@ TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) {
   const int max_num_buffers = 10;
   RuntimeState* state;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_, 0,
-      false, client_tracker_.get(), &client, &state);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client, &state);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -984,8 +988,7 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
   const int block_size = 1024;
   BufferedBlockMgr* block_mgr;
   BufferedBlockMgr::Client* client;
-  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false,
-      client_tracker_.get(), &client);
+  block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, false, &client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -1013,8 +1016,8 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
 TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
   int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_, 0,
-      false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
 
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
@@ -1047,8 +1050,8 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
   int blocks_per_mgr = MAX_NUM_BLOCKS / NUM_BLOCK_MGRS;
   vector<BufferedBlockMgr*> block_mgrs;
   vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(0, NUM_BLOCK_MGRS, blocks_per_mgr, block_size_, 0, false,
-      client_tracker_.get(), &block_mgrs, &clients);
+  CreateMgrsAndClients(
+      0, NUM_BLOCK_MGRS, blocks_per_mgr, block_size_, 0, false, &block_mgrs, &clients);
 
   // Allocate files for all 2x2 combinations by unpinning blocks.
   vector<vector<BufferedBlockMgr::Block*>> blocks;
@@ -1106,8 +1109,8 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
   }
   // A new block manager should only use the good dir for backing storage.
   BufferedBlockMgr::Client* new_client;
-  BufferedBlockMgr* new_block_mgr = CreateMgrAndClient(9999, blocks_per_mgr, block_size_,
-      0, false, client_tracker_.get(), &new_client);
+  BufferedBlockMgr* new_block_mgr =
+      CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, &new_client);
   vector<BufferedBlockMgr::Block*> new_mgr_blocks;
   AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
   UnpinBlocks(new_mgr_blocks);
@@ -1130,8 +1133,8 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
   vector<RuntimeState*> runtime_states;
   vector<BufferedBlockMgr*> block_mgrs;
   vector<BufferedBlockMgr::Client*> clients;
-  CreateMgrsAndClients(0, num_block_mgrs, blocks_per_mgr, block_size_, 0,
-      false, client_tracker_.get(), &block_mgrs, &clients);
+  CreateMgrsAndClients(
+      0, num_block_mgrs, blocks_per_mgr, block_size_, 0, false, &block_mgrs, &clients);
 
   // Allocate files for all 2x2 combinations by unpinning blocks.
   vector<vector<BufferedBlockMgr::Block*>> blocks;
@@ -1164,8 +1167,8 @@ TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int max_num_buffers = 2;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   for (int i = 0; i < tmp_dirs.size(); ++i) {
@@ -1183,8 +1186,8 @@ TEST_F(BufferedBlockMgrTest, NoTmpDirs) {
   InitMultipleTmpDirs(0);
   int max_num_buffers = 3;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      0, false, client_tracker_.get(), &client);
+  BufferedBlockMgr* block_mgr =
+      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   DeleteBlocks(blocks);
@@ -1198,7 +1201,7 @@ TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
   TQueryOptions query_options;
   query_options.scratch_limit = 0;
   BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      0, false, client_tracker_.get(), &client, NULL, &query_options);
+      0, false, &client, NULL, &query_options);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   DeleteBlocks(blocks);
@@ -1215,11 +1218,11 @@ TEST_F(BufferedBlockMgrTest, MultipleClients) {
 
   BufferedBlockMgr::Client* client1 = NULL;
   BufferedBlockMgr::Client* client2 = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
 
   // Reserve client 1's and 2's buffers. They should succeed.
@@ -1317,11 +1320,11 @@ TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) {
   BufferedBlockMgr::Client* client1 = NULL;
   BufferedBlockMgr::Client* client2 = NULL;
   BufferedBlockMgr::Block* block = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
 
   vector<BufferedBlockMgr::Block*> client1_blocks;
@@ -1367,14 +1370,14 @@ TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
   BufferedBlockMgr::Client* client2 = NULL;
   BufferedBlockMgr::Client* client3 = NULL;
   BufferedBlockMgr::Block* block = NULL;
-  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false, client_tracker_.get(),
-      runtime_state, &client1));
+  EXPECT_OK(block_mgr->RegisterClient("", client1_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client1));
   EXPECT_TRUE(client1 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false, client_tracker_.get(),
-      runtime_state, &client2));
+  EXPECT_OK(block_mgr->RegisterClient("", client2_buffers, false,
+      NewClientTracker(runtime_state), runtime_state, &client2));
   EXPECT_TRUE(client2 != NULL);
-  EXPECT_OK(block_mgr->RegisterClient("", client3_buffers, true, client_tracker_.get(),
-      runtime_state, &client3));
+  EXPECT_OK(block_mgr->RegisterClient("", client3_buffers, true,
+      NewClientTracker(runtime_state), runtime_state, &client3));
   EXPECT_TRUE(client3 != NULL);
 
   // Client one allocates first block, should work.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index ad8ad85..b715ec5 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -393,6 +393,7 @@ class BufferedBlockMgr {
 
   int num_pinned_buffers(Client* client) const;
   int num_reserved_buffers_remaining(Client* client) const;
+  MemTracker* mem_tracker() const { return mem_tracker_.get(); };
   MemTracker* get_tracker(Client* client) const;
   int64_t max_block_size() const { return max_block_size_; }
   int64_t bytes_allocated() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 5049e8b..8d07584 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -102,8 +102,10 @@ class SimpleTupleStreamTest : public testing::Test {
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
     ASSERT_OK(test_env_->CreateQueryState(0, limit, block_size, &runtime_state_));
-    ASSERT_OK(runtime_state_->block_mgr()->RegisterClient("", 0, false, &tracker_,
-        runtime_state_, &client_));
+    MemTracker* client_tracker = pool_.Add(
+        new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
+        "", 0, false, client_tracker, runtime_state_, &client_));
   }
 
   /// Generate the ith element of a sequence of int values.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index a2c3e9b..17b8ba3 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -140,6 +140,7 @@ class MemTracker {
       DCHECK(!all_trackers_[i]->has_limit());
       all_trackers_[i]->consumption_->Add(bytes);
     }
+    DCHECK(false) << "end_tracker is not an ancestor";
   }
 
   void ReleaseLocal(int64_t bytes, MemTracker* end_tracker) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index c5a9a41..f0caab7 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -37,7 +37,6 @@ TestEnv::TestEnv() {
   exec_env_.reset(new ExecEnv);
   exec_env_->InitForFeTests();
   io_mgr_tracker_.reset(new MemTracker(-1));
-  block_mgr_parent_tracker_.reset(new MemTracker(-1));
   exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
   InitMetrics();
   tmp_file_mgr_.reset(new TmpFileMgr);
@@ -59,7 +58,6 @@ void TestEnv::InitTmpFileMgr(const std::vector<std::string>& tmp_dirs,
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
   TearDownQueryStates();
-  block_mgr_parent_tracker_.reset();
   exec_env_.reset();
   io_mgr_tracker_.reset();
   tmp_file_mgr_.reset();
@@ -82,12 +80,13 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
     return Status("Unexpected error creating RuntimeState");
   }
 
+  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
+
   shared_ptr<BufferedBlockMgr> mgr;
   RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
-      block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(),
+      (*runtime_state)->query_mem_tracker(), (*runtime_state)->runtime_profile(),
       tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
   (*runtime_state)->set_block_mgr(mgr);
-  (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
 
   query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
   return Status::OK();
@@ -116,4 +115,11 @@ int64_t TestEnv::CalculateMemLimit(int max_buffers, int block_size) {
   return max_buffers * static_cast<int64_t>(block_size);
 }
 
+int64_t TestEnv::TotalQueryMemoryConsumption() {
+  int64_t total = 0;
+  for (shared_ptr<RuntimeState>& query_state : query_states_) {
+    total += query_state->query_mem_tracker()->consumption();
+  }
+  return total;
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4849e586/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index a3ab29a..d30424d 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -54,8 +54,10 @@ class TestEnv {
   /// If max_buffers is -1, no memory limit will apply.
   int64_t CalculateMemLimit(int max_buffers, int block_size);
 
+  /// Return total of mem tracker consumption for all queries.
+  int64_t TotalQueryMemoryConsumption();
+
   ExecEnv* exec_env() { return exec_env_.get(); }
-  MemTracker* block_mgr_parent_tracker() { return block_mgr_parent_tracker_.get(); }
   MemTracker* io_mgr_tracker() { return io_mgr_tracker_.get(); }
   MetricGroup* metrics() { return metrics_.get(); }
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
@@ -72,7 +74,6 @@ class TestEnv {
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;
   boost::scoped_ptr<ExecEnv> exec_env_;
-  boost::scoped_ptr<MemTracker> block_mgr_parent_tracker_;
   boost::scoped_ptr<MemTracker> io_mgr_tracker_;
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;


[2/2] incubator-impala git commit: IMPALA-4008: Don't bake ExprContext pointers into IR code

Posted by kw...@apache.org.
IMPALA-4008: Don't bake ExprContext pointers into IR code

To allow genearated code to be shared across multiple fragment
instances, this change removes the ExprContext pointers baked
into various IR functions (e.g. AGG/PAGG/hash-table).

Change-Id: I42039eed803a39fa716b9ed647510b6440974ae5
Reviewed-on: http://gerrit.cloudera.org:8080/4390
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6cc296ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6cc296ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6cc296ec

Branch: refs/heads/master
Commit: 6cc296ec85a4260446333f89b1f5df0d7bd1ec95
Parents: 4849e58
Author: Michael Ho <kw...@cloudera.com>
Authored: Fri Sep 9 00:56:46 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Sep 27 20:20:17 2016 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py          |   6 +
 be/src/codegen/llvm-codegen.cc                 |   4 +
 be/src/codegen/llvm-codegen.h                  |   3 +
 be/src/exec/aggregation-node-ir.cc             |   8 +
 be/src/exec/aggregation-node.cc                | 180 +++++++++++-------
 be/src/exec/aggregation-node.h                 |  20 +-
 be/src/exec/hash-table-ir.cc                   |   4 +
 be/src/exec/hash-table.cc                      |  50 +++--
 be/src/exec/hash-table.h                       |  26 ++-
 be/src/exec/partitioned-aggregation-node-ir.cc |   8 +-
 be/src/exec/partitioned-aggregation-node.cc    | 192 ++++++++++++--------
 be/src/exec/partitioned-aggregation-node.h     |  19 +-
 be/src/exprs/agg-fn-evaluator.h                |   4 +-
 13 files changed, 349 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 9ba9d78..a12d73d 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -44,6 +44,8 @@ options, args = parser.parse_args()
 ir_functions = [
   ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING", "ProcessRowBatchWithGrouping"],
   ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING", "ProcessRowBatchNoGrouping"],
+  ["AGG_NODE_GET_EXPR_CTX", "GetAggExprCtx"],
+  ["AGG_NODE_GET_FN_CTX", "GetAggFnCtx"],
   ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED",
       "PartitionedAggregationNode12ProcessBatchILb0"],
   ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED",
@@ -52,6 +54,8 @@ ir_functions = [
       "PartitionedAggregationNode22ProcessBatchNoGrouping"],
   ["PART_AGG_NODE_PROCESS_BATCH_STREAMING",
       "PartitionedAggregationNode21ProcessBatchStreaming"],
+  ["PART_AGG_NODE_GET_EXPR_CTX",
+      "PartitionedAggregationNode17GetAggExprContext"],
   ["AVG_UPDATE_BIGINT", "9AvgUpdateIN10impala_udf9BigIntVal"],
   ["AVG_UPDATE_DOUBLE", "9AvgUpdateIN10impala_udf9DoubleVal"],
   ["AVG_UPDATE_TIMESTAMP", "TimestampAvgUpdate"],
@@ -89,6 +93,8 @@ ir_functions = [
   ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN", "ProcessProbeBatchILi8"],
   ["PHJ_INSERT_BATCH", "9Partition11InsertBatch"],
   ["HASH_TABLE_GET_HASH_SEED", "GetHashSeed"],
+  ["HASH_TABLE_GET_BUILD_EXPR_CTX", "HashTableCtx15GetBuildExprCtx"],
+  ["HASH_TABLE_GET_PROBE_EXPR_CTX", "HashTableCtx15GetProbeExprCtx"],
   ["HLL_UPDATE_BOOLEAN", "HllUpdateIN10impala_udf10BooleanVal"],
   ["HLL_UPDATE_TINYINT", "HllUpdateIN10impala_udf10TinyIntVal"],
   ["HLL_UPDATE_SMALLINT", "HllUpdateIN10impala_udf11SmallIntVal"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index b2a3b18..b107c51 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -581,6 +581,10 @@ PointerType* LlvmCodeGen::GetPtrType(Type* type) {
   return PointerType::get(type, 0);
 }
 
+PointerType* LlvmCodeGen::GetPtrPtrType(Type* type) {
+  return PointerType::get(PointerType::get(type, 0), 0);
+}
+
 // Llvm doesn't let you create a PointerValue from a c-side ptr.  Instead
 // cast it to an int and then to 'type'.
 Value* LlvmCodeGen::CastPtrToLlvmPtr(Type* type, const void* ptr) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index fa9f1b1..2ef936f 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -218,6 +218,9 @@ class LlvmCodeGen {
   /// Return a pointer type to 'type'
   llvm::PointerType* GetPtrType(llvm::Type* type);
 
+  /// Return a pointer to pointer type to 'type'.
+  llvm::PointerType* GetPtrPtrType(llvm::Type* type);
+
   /// Returns llvm type for the column type
   llvm::Type* GetType(const ColumnType& type);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-ir.cc b/be/src/exec/aggregation-node-ir.cc
index 185196a..8050b58 100644
--- a/be/src/exec/aggregation-node-ir.cc
+++ b/be/src/exec/aggregation-node-ir.cc
@@ -28,6 +28,14 @@ using namespace impala;
 // Functions in this file are cross compiled to IR with clang.  These functions
 // are modified at runtime with a query specific codegen'd UpdateAggTuple
 
+FunctionContext* AggregationNode::GetAggFnCtx(int i) const {
+  return agg_fn_ctxs_[i];
+}
+
+ExprContext* AggregationNode::GetAggExprCtx(int i) const {
+  return agg_expr_ctxs_[i];
+}
+
 void AggregationNode::ProcessRowBatchNoGrouping(RowBatch* batch) {
   for (int i = 0; i < batch->num_rows(); ++i) {
     UpdateTuple(singleton_intermediate_tuple_, batch->GetRow(i));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 2b9550a..909d42b 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -81,6 +81,18 @@ Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(AggFnEvaluator::Create(
         pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
     aggregate_evaluators_.push_back(evaluator);
+    ExprContext* agg_expr_ctx;
+    if (evaluator->input_expr_ctxs().size() == 1) {
+      agg_expr_ctx = evaluator->input_expr_ctxs()[0];
+    } else {
+      // CodegenUpdateSlot() can only support aggregate operator with only one ExprContext
+      // so it doesn't support operator such as group_concat. There are also aggregate
+      // operators with no ExprContext (e.g. count(*)). In cases above, 'agg_expr_ctxs_'
+      // will contain NULL for that entry.
+      DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star());
+      agg_expr_ctx = NULL;
+    }
+    agg_expr_ctxs_.push_back(agg_expr_ctx);
   }
   return Status::OK();
 }
@@ -302,6 +314,7 @@ void AggregationNode::Close(RuntimeState* state) {
   if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
   if (hash_tbl_.get() != NULL) hash_tbl_->Close();
 
+  agg_expr_ctxs_.clear();
   DCHECK(agg_fn_ctxs_.empty() || aggregate_evaluators_.size() == agg_fn_ctxs_.size());
   for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
     aggregate_evaluators_[i]->Close(state);
@@ -432,23 +445,26 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
 }
 
 // IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row)
+// void UpdateSlot(FunctionContext* fn_ctx, ExprContext* expr_ctx,
+//     AggTuple* agg_tuple, char** row)
 //
 // The IR for sum(double_col) is:
 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                         { i8, double }* %agg_tuple,
-//                         %"class.impala::TupleRow"* %row) #20 {
+//                         %"class.impala::ExprContext"* %expr_ctx,
+//                         { i8, [7 x i8], double }* %agg_tuple,
+//                         %"class.impala::TupleRow"* %row) #34 {
 // entry:
-//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//     (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
+//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx,
+//                                          %"class.impala::TupleRow"* %row)
 //   %0 = extractvalue { i8, double } %src, 0
 //   %is_null = trunc i8 %0 to i1
 //   br i1 %is_null, label %ret, label %src_not_null
 //
 // src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1
-//   call void @SetNotNull({ i8, double }* %agg_tuple)
-//   %dst_val = load double* %dst_slot_ptr
+//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double },
+//       { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2
+//   call void @SetNotNull({ i8, [7 x i8], double }* %agg_tuple)
+//   %dst_val = load double, double* %dst_slot_ptr
 //   %val = extractvalue { i8, double } %src, 1
 //   %1 = fadd double %dst_val, %val
 //   store double %1, double* %dst_slot_ptr
@@ -460,25 +476,27 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
 //
 // The IR for ndv(double_col) is:
 // define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                         { i8, %"struct.impala::StringValue" }* %agg_tuple,
-//                         %"class.impala::TupleRow"* %row) #20 {
+//                         %"class.impala::ExprContext"* %expr_ctx,
+//                         { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple,
+//                         %"class.impala::TupleRow"* %row) #34 {
 // entry:
 //   %dst_lowered_ptr = alloca { i64, i8* }
-//   %src_lowered_ptr = alloca { i8, double }
-//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//     (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
-//   %0 = extractvalue { i8, double } %src, 0
-//   %is_null = trunc i8 %0 to i1
+//   %src_lowered_ptr = alloca { i64, i8* }
+//   %src = call { i64, i8* } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx,
+//                                        %"class.impala::TupleRow"* %row)
+//   %0 = extractvalue { i64, i8* } %src, 0
+//   %is_null = trunc i64 %0 to i1
 //   br i1 %is_null, label %ret, label %src_not_null
 //
 // src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds
-//     { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1
-//   call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple)
-//   %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr
-//   store { i8, double } %src, { i8, double }* %src_lowered_ptr
-//   %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr
-//                        to %"struct.impala_udf::DoubleVal"*
+//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], %"struct.impala::StringValue" },
+//       { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 2
+//   call void @SetNotNull({ i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple)
+//   %dst_val =
+//       load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr
+//   store { i64, i8* } %src, { i64, i8* }* %src_lowered_ptr
+//   %src_unlowered_ptr =
+//       bitcast { i64, i8* }* %src_lowered_ptr to %"struct.impala_udf::StringVal"*
 //   %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
 //   %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
 //   %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
@@ -489,18 +507,18 @@ IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
 //   %5 = or i64 %4, %3
 //   %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0
 //   store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
-//   %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr
-//                        to %"struct.impala_udf::StringVal"*
-//   call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                        %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
-//                        %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
-//   %anyval_result = load { i64, i8* }* %dst_lowered_ptr
-//   %6 = extractvalue { i64, i8* } %anyval_result, 1
-//   %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0
-//   %8 = extractvalue { i64, i8* } %anyval_result, 0
-//   %9 = ashr i64 %8, 32
-//   %10 = trunc i64 %9 to i32
-//   %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
+//   %dst_unlowered_ptr =
+//       bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
+//   call void @HllMerge(%"class.impala_udf::FunctionContext"* %fn_ctx,
+//                       %"struct.impala_udf::StringVal"* %src_unlowered_ptr,
+//                       %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
+//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
+//   %6 = extractvalue { i64, i8* } %anyval_result, 0
+//   %7 = ashr i64 %6, 32
+//   %8 = trunc i64 %7 to i32
+//   %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1
+//   %10 = extractvalue { i64, i8* } %anyval_result, 1
+//   %11 = insertvalue %"struct.impala::StringValue" %9, i8* %10, 0
 //   store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr
 //   br label %ret
 //
@@ -512,6 +530,8 @@ llvm::Function* AggregationNode::CodegenUpdateSlot(
   LlvmCodeGen* codegen;
   if (!state->GetCodegen(&codegen).ok()) return NULL;
 
+  // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
+  // with multiple input expressions (e.g. group_concat).
   DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
   ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0];
   Expr* input_expr = input_expr_ctx->root();
@@ -525,33 +545,34 @@ llvm::Function* AggregationNode::CodegenUpdateSlot(
   }
   DCHECK(agg_expr_fn != NULL);
 
-  PointerType* fn_ctx_type =
+  PointerType* fn_ctx_ptr_type =
       codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
+  PointerType* expr_ctx_ptr_type = codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME);
   StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
+  PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
   PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
 
   // Create UpdateSlot prototype
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_ptr_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_ctx", expr_ctx_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
   LlvmCodeGen::LlvmBuilder builder(codegen->context());
-  Value* args[3];
+  Value* args[4];
   Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
   Value* fn_ctx_arg = args[0];
-  Value* agg_tuple_arg = args[1];
-  Value* row_arg = args[2];
+  Value* expr_ctx_arg = args[1];
+  Value* agg_tuple_arg = args[2];
+  Value* row_arg = args[3];
 
   BasicBlock* src_not_null_block =
       BasicBlock::Create(codegen->context(), "src_not_null", fn);
   BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
 
   // Call expr function to get src slot value
-  Value* ctx_arg = codegen->CastPtrToLlvmPtr(
-      codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx);
-  Value* agg_expr_fn_args[] = { ctx_arg, row_arg };
+  Value* agg_expr_fn_args[] = { expr_ctx_arg, row_arg };
   CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(
       codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
 
@@ -660,23 +681,38 @@ llvm::Function* AggregationNode::CodegenUpdateSlot(
 // For the query:
 // select count(*), count(int_col), sum(double_col) the IR looks like:
 //
+// ; Function Attrs: alwaysinline
 // define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr,
 //                          %"class.impala::Tuple"* %agg_tuple,
-//                          %"class.impala::TupleRow"* %tuple_row) #20 {
+//                          %"class.impala::TupleRow"* %tuple_row) #34 {
 // entry:
-//   %tuple = bitcast %"class.impala::Tuple"* %agg_tuple to { i8, i64, i64, double }*
-//   %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple, i32 0, i32 1
-//   %count_star_val = load i64* %src_slot
+//   %tuple =
+//       bitcast %"class.impala::Tuple"* %agg_tuple to { i8, [7 x i8], i64, i64, double }*
+//   %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double },
+//       { i8, [7 x i8], i64, i64, double }* %tuple, i32 0, i32 2
+//   %count_star_val = load i64, i64* %src_slot
 //   %count_star_inc = add i64 %count_star_val, 1
 //   store i64 %count_star_inc, i64* %src_slot
-//   call void @UpdateSlot(%"class.impala_udf::FunctionContext"* inttoptr
-//                           (i64 44521296 to %"class.impala_udf::FunctionContext"*),
-//                         { i8, i64, i64, double }* %tuple,
+//   %0 = call %"class.impala_udf::FunctionContext"*
+//       @_ZNK6impala15AggregationNode11GetAggFnCtxEi(
+//           %"class.impala::AggregationNode"* %this_ptr, i32 1)
+//   %1 = call %"class.impala::ExprContext"*
+//       @_ZNK6impala15AggregationNode13GetAggExprCtxEi(
+//           %"class.impala::AggregationNode"* %this_ptr, i32 1)
+//   call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %0,
+//                         %"class.impala::ExprContext"* %1,
+//                         { i8, [7 x i8], i64, i64, double }* %tuple,
 //                         %"class.impala::TupleRow"* %tuple_row)
-//   call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* inttoptr
-//                            (i64 44521328 to %"class.impala_udf::FunctionContext"*),
-//                          { i8, i64, i64, double }* %tuple,
-//                          %"class.impala::TupleRow"* %tuple_row)
+//   %2 = call %"class.impala_udf::FunctionContext"*
+//       @_ZNK6impala15AggregationNode11GetAggFnCtxEi(
+//           %"class.impala::AggregationNode"* %this_ptr, i32 2)
+//   %3 = call %"class.impala::ExprContext"*
+//       @_ZNK6impala15AggregationNode13GetAggExprCtxEi(
+//           %"class.impala::AggregationNode"* %this_ptr, i32 2)
+//   call void @UpdateSlot.3(%"class.impala_udf::FunctionContext"* %2,
+//                           %"class.impala::ExprContext"* %3,
+//                           { i8, [7 x i8], i64, i64, double }* %tuple,
+//                           %"class.impala::TupleRow"* %tuple_row)
 //   ret void
 // }
 Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
@@ -721,12 +757,13 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
   DCHECK(agg_tuple_type != NULL);
   DCHECK(tuple_row_type != NULL);
 
-  PointerType* agg_node_ptr_type = PointerType::get(agg_node_type, 0);
-  PointerType* agg_tuple_ptr_type = PointerType::get(agg_tuple_type, 0);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
+  PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
+  PointerType* agg_tuple_ptr_type = codegen->GetPtrType(agg_tuple_type);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
 
   // Signature for UpdateTuple is
-  // void UpdateTuple(AggregationNode* this, Tuple* tuple, TupleRow* row)
+  // void UpdateTuple(AggregationNode* this, FunctionContext** fn_ctx,
+  //     ExprContext** expr_ctx, Tuple* tuple, TupleRow* row)
   // This signature needs to match the non-codegen'd signature exactly.
   StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
   PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
@@ -741,7 +778,15 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
 
   // Cast the parameter types to the internal llvm runtime types.
   // TODO: get rid of this by using right type in function signature
-  args[1] = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
+  Value* this_arg = args[0];
+  Value* agg_tuple_arg = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
+  Value* row_arg = args[2];
+
+  Function* get_fn_ctx_fn = codegen->GetFunction(IRFunction::AGG_NODE_GET_FN_CTX, false);
+  DCHECK(get_fn_ctx_fn != NULL);
+  Function* get_expr_ctx_fn =
+      codegen->GetFunction(IRFunction::AGG_NODE_GET_EXPR_CTX, false);
+  DCHECK(get_expr_ctx_fn != NULL);
 
   // Loop over each expr and generate the IR for that slot.  If the expr is not
   // count(*), generate a helper IR function to update the slot and call that.
@@ -754,18 +799,23 @@ Function* AggregationNode::CodegenUpdateTuple(RuntimeState* state) {
       // increment the slot by the number of rows in the batch.
       int field_idx = slot_desc->llvm_field_idx();
       Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
-      Value* slot_ptr = builder.CreateStructGEP(NULL, args[1], field_idx, "src_slot");
+      Value* slot_ptr = builder.CreateStructGEP(NULL, agg_tuple_arg, field_idx,
+          "src_slot");
       Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
       Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
       builder.CreateStore(count_inc, slot_ptr);
     } else {
       Function* update_slot_fn = CodegenUpdateSlot(state, evaluator, slot_desc);
       if (update_slot_fn == NULL) return NULL;
-      Value* fn_ctx_arg = codegen->CastPtrToLlvmPtr(
-          codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME),
-          agg_fn_ctxs_[i]);
-      builder.CreateCall(update_slot_fn,
-          ArrayRef<Value*>({fn_ctx_arg, args[1], args[2]}));
+      // Call GetAggFnCtx() to get the function context.
+      Value* get_fn_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
+      Value* fn_ctx = builder.CreateCall(get_fn_ctx_fn, get_fn_ctx_args);
+      // Call GetAggExprCtx() to get the expression context.
+      DCHECK(agg_expr_ctxs_[i] != NULL);
+      Value* get_expr_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
+      Value* expr_ctx = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args);
+      Value* update_slot_args[] = { fn_ctx, expr_ctx, agg_tuple_arg, row_arg };
+      builder.CreateCall(update_slot_fn, update_slot_args);
     }
   }
   builder.CreateRetVoid();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
index eaaf97c..5d87d82 100644
--- a/be/src/exec/aggregation-node.h
+++ b/be/src/exec/aggregation-node.h
@@ -69,12 +69,20 @@ class AggregationNode : public ExecNode {
   boost::scoped_ptr<OldHashTable> hash_tbl_;
   OldHashTable::Iterator output_iterator_;
 
+  /// The list of all aggregate operations for this exec node.
   std::vector<AggFnEvaluator*> aggregate_evaluators_;
 
-  /// FunctionContext for each agg fn and backing pool.
+  /// FunctionContexts and backing MemPools of 'aggregate_evaluators_'.
+  /// FunctionContexts objects are stored in ObjectPool of RuntimeState.
   std::vector<impala_udf::FunctionContext*> agg_fn_ctxs_;
   boost::scoped_ptr<MemPool> agg_fn_pool_;
 
+  /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the codegen'ed
+  /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at runtime.
+  /// An entry is NULL if the aggregate evaluator is not codegen'ed or there is no
+  /// Expr in the aggregate evaluator (e.g. count(*)).
+  std::vector<ExprContext*> agg_expr_ctxs_;
+
   /// Exprs used to evaluate input rows
   std::vector<ExprContext*> probe_expr_ctxs_;
   /// Exprs used to insert constructed aggregation tuple into the hash table.
@@ -124,7 +132,7 @@ class AggregationNode : public ExecNode {
   Tuple* ConstructIntermediateTuple();
 
   /// Updates the aggregation intermediate tuple 'tuple' with aggregation values
-  /// computed over 'row'.
+  /// computed over 'row'. This function is replaced by codegen.
   void UpdateTuple(Tuple* tuple, TupleRow* row);
 
   /// Called on the intermediate tuple of each group after all input rows have been
@@ -135,6 +143,14 @@ class AggregationNode : public ExecNode {
   /// Returns the tuple holding the final aggregate values.
   Tuple* FinalizeTuple(Tuple* tuple, MemPool* pool);
 
+  /// Accessor for the function context of an AggFnEvaluator. Used only in codegen'ed
+  /// version of the UpdateSlot().
+  FunctionContext* IR_ALWAYS_INLINE GetAggFnCtx(int i) const;
+
+  /// Accessor for the expression context of an AggFnEvaluator. Used only in codegen'ed
+  /// version of the UpdateSlot().
+  ExprContext* IR_ALWAYS_INLINE GetAggExprCtx(int i) const;
+
   /// Do the aggregation for all tuple rows in the batch
   void ProcessRowBatchNoGrouping(RowBatch* batch);
   void ProcessRowBatchWithGrouping(RowBatch* batch);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-ir.cc b/be/src/exec/hash-table-ir.cc
index ce9c317..a702736 100644
--- a/be/src/exec/hash-table-ir.cc
+++ b/be/src/exec/hash-table-ir.cc
@@ -23,4 +23,8 @@ using namespace impala;
 
 uint32_t HashTableCtx::GetHashSeed() const { return seeds_[level_]; }
 
+ExprContext* HashTableCtx::GetBuildExprCtx(int i) const { return build_expr_ctxs_[i]; }
+
+ExprContext* HashTableCtx::GetProbeExprCtx(int i) const { return probe_expr_ctxs_[i]; }
+
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index dfa700e..0d780b9 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -183,8 +183,8 @@ bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs
   return has_null;
 }
 
-uint32_t HashTableCtx::HashVariableLenRow(
-    const uint8_t* expr_values, const uint8_t* expr_values_null) const {
+uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values,
+    const uint8_t* expr_values_null) const {
   uint32_t hash = seeds_[level_];
   int var_result_offset = expr_values_cache_.var_result_offset();
   // Hash the non-var length portions (if there are any)
@@ -699,30 +699,36 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
   RETURN_IF_ERROR(state->GetCodegen(&codegen));
 
   // Get types to generate function prototype
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
+  PointerType* this_ptr_type = codegen->GetPtrType(this_type);
+  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
+  DCHECK(tuple_row_type != NULL);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
   LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
       codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values", codegen->ptr_type()));
-  prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("expr_values_null", codegen->ptr_type()));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_values_null",
+      codegen->ptr_type()));
 
   LLVMContext& context = codegen->context();
   LlvmCodeGen::LlvmBuilder builder(context);
   Value* args[4];
   *fn = prototype.GeneratePrototype(&builder, args);
+  Value* this_ptr = args[0];
   Value* row = args[1];
   Value* expr_values = args[2];
   Value* expr_values_null = args[3];
   Value* has_null = codegen->false_value();
 
+  IRFunction::Type get_expr_ctx_fn_name = build ?
+      IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX :
+      IRFunction::HASH_TABLE_GET_PROBE_EXPR_CTX;
+  Function* get_expr_ctx_fn = codegen->GetFunction(get_expr_ctx_fn_name, false);
+  DCHECK(get_expr_ctx_fn != NULL);
+
   for (int i = 0; i < ctxs.size(); ++i) {
     // TODO: refactor this to somewhere else?  This is not hash table specific except for
     // the null handling bit and would be used for anyone that needs to materialize a
@@ -748,8 +754,8 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, bool build, Function**
           status.GetDetail()));
     }
 
-    Value* ctx_arg = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), ctxs[i]);
+    Value* get_expr_ctx_args[] = { this_ptr, codegen->GetIntConstant(TYPE_INT, i) };
+    Value* ctx_arg = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args, "expr_ctx");
     Value* expr_fn_args[] = { ctx_arg, row };
     CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
         codegen, &builder, ctxs[i]->root()->type(), expr_fn, expr_fn_args, "result");
@@ -845,7 +851,7 @@ Status HashTableCtx::CodegenHashRow(RuntimeState* state, bool use_murmur, Functi
   // Get types to generate function prototype
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
+  PointerType* this_ptr_type = codegen->GetPtrType(this_type);
 
   LlvmCodeGen::FnPrototype prototype(
       codegen, (use_murmur ? "MurmurHashRow" : "HashRow"), codegen->GetType(TYPE_INT));
@@ -1050,13 +1056,13 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state->GetCodegen(&codegen));
   // Get types to generate function prototype
+  Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
+  DCHECK(this_type != NULL);
+  PointerType* this_ptr_type = codegen->GetPtrType(this_type);
   Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
   DCHECK(tuple_row_type != NULL);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
 
-  Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
   LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
@@ -1068,10 +1074,15 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
   LlvmCodeGen::LlvmBuilder builder(context);
   Value* args[4];
   *fn = prototype.GeneratePrototype(&builder, args);
+  Value* this_ptr = args[0];
   Value* row = args[1];
   Value* expr_values = args[2];
   Value* expr_values_null = args[3];
 
+  Function* get_expr_ctx_fn =
+      codegen->GetFunction(IRFunction::HASH_TABLE_GET_BUILD_EXPR_CTX, false);
+  DCHECK(get_expr_ctx_fn != NULL);
+
   BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
@@ -1088,8 +1099,11 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality
           status.GetDetail()));
     }
 
-    Value* ctx_arg = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), build_expr_ctxs_[i]);
+    // Load ExprContext* from 'build_expr_ctxs_'.
+    Value* get_expr_ctx_args[] = { this_ptr, codegen->GetIntConstant(TYPE_INT, i) };
+    Value* ctx_arg = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args, "expr_ctx");
+
+    // Evaluate the expression.
     Value* expr_fn_args[] = { ctx_arg, row };
     CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
         build_expr_ctxs_[i]->root()->type(), expr_fn, expr_fn_args, "result");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 58078ad..fead1f7 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -300,7 +300,9 @@ class HashTableCtx {
     uint32_t ALWAYS_INLINE CurExprValuesHash() const { return *cur_expr_values_hash_; }
 
     /// Sets the hash values for the current row.
-    void ALWAYS_INLINE SetCurExprValuesHash(uint32_t hash) { *cur_expr_values_hash_ = hash; }
+    void ALWAYS_INLINE SetCurExprValuesHash(uint32_t hash) {
+      *cur_expr_values_hash_ = hash;
+    }
 
     /// Returns a pointer to the expression value at 'expr_idx' in 'expr_values'.
     uint8_t* ExprValuePtr(uint8_t* expr_values, int expr_idx) const;
@@ -410,19 +412,19 @@ class HashTableCtx {
   uint32_t Hash(const void* input, int len, uint32_t hash) const;
 
   /// Evaluate 'row' over build exprs, storing values into 'expr_values' and nullness into
-  /// 'expr_values_null'. This will be replaced by codegen. We do not want this
-  /// function inlined when cross compiled because we need to be able to differentiate
-  /// between EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked
-  /// into the codegen'd function.
-  bool IR_NO_INLINE EvalBuildRow(
-      const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) {
+  /// 'expr_values_null'. This will be replaced by codegen. We do not want this function
+  /// inlined when cross compiled because we need to be able to differentiate between
+  /// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked into the
+  /// codegen'd function.
+  bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values,
+      uint8_t* expr_values_null) {
     return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null);
   }
 
   /// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness
   /// into 'expr_values_null'. This will be replaced by codegen.
-  bool IR_NO_INLINE EvalProbeRow(
-      const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) {
+  bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values,
+      uint8_t* expr_values_null) {
     return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null);
   }
 
@@ -454,12 +456,16 @@ class HashTableCtx {
   }
 
   /// Cross-compiled function to access member variables used in CodegenHashRow().
-  uint32_t GetHashSeed() const;
+  uint32_t IR_ALWAYS_INLINE GetHashSeed() const;
 
   /// Functions to be replaced by codegen to specialize the hash table.
   bool IR_NO_INLINE stores_nulls() const { return stores_nulls_; }
   bool IR_NO_INLINE finds_some_nulls() const { return finds_some_nulls_; }
 
+  /// Cross-compiled function to access the build/probe expression context.
+  ExprContext* IR_ALWAYS_INLINE GetBuildExprCtx(int i) const;
+  ExprContext* IR_ALWAYS_INLINE GetProbeExprCtx(int i) const;
+
   const std::vector<ExprContext*>& build_expr_ctxs_;
   const std::vector<ExprContext*>& probe_expr_ctxs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 194f6c4..ed95844 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -26,6 +26,10 @@
 
 using namespace impala;
 
+ExprContext* PartitionedAggregationNode::GetAggExprContext(int i) const {
+  return agg_expr_ctxs_[i];
+}
+
 Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
   Tuple* output_tuple = singleton_output_tuple_;
   FOREACH_ROW(batch, 0, batch_iter) {
@@ -202,7 +206,7 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
           DCHECK(!process_batch_status_.ok());
           return process_batch_status_;
         }
-        UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false);
+        UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row);
         out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
         out_batch_iterator.Next();
         out_batch->CommitLastRow();
@@ -250,7 +254,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
     }
   }
 
-  UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row, false);
+  UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row);
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index eb5addc..ba2d9f7 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -153,6 +153,18 @@ Status PartitionedAggregationNode::Init(const TPlanNode& tnode, RuntimeState* st
     RETURN_IF_ERROR(AggFnEvaluator::Create(
         pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
     aggregate_evaluators_.push_back(evaluator);
+    ExprContext* agg_expr_ctx;
+    if (evaluator->input_expr_ctxs().size() == 1) {
+      agg_expr_ctx = evaluator->input_expr_ctxs()[0];
+    } else {
+      // CodegenUpdateSlot() can only support aggregate operator with only one ExprContext
+      // so it doesn't support operator such as group_concat. There are also aggregate
+      // operators with no ExprContext (e.g. count(*)). In cases above, 'agg_expr_ctxs_'
+      // will contain NULL for that entry.
+      DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star());
+      agg_expr_ctx = NULL;
+    }
+    agg_expr_ctxs_.push_back(agg_expr_ctx);
   }
   return Status::OK();
 }
@@ -696,6 +708,7 @@ void PartitionedAggregationNode::Close(RuntimeState* state) {
   for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
     aggregate_evaluators_[i]->Close(state);
   }
+  agg_expr_ctxs_.clear();
   for (int i = 0; i < agg_fn_ctxs_.size(); ++i) {
     agg_fn_ctxs_[i]->impl()->Close();
   }
@@ -1407,23 +1420,28 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 }
 
 // IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(FunctionContext* fn_ctx, AggTuple* agg_tuple, char** row)
+// void UpdateSlot(FunctionContext* agg_fn_ctx, ExprContext* agg_expr_ctx,
+//     AggTuple* agg_tuple, char** row)
 //
 // The IR for sum(double_col) is:
-// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                         { i8, double }* %agg_tuple,
-//                         %"class.impala::TupleRow"* %row) #20 {
+//
+// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+//                         %"class.impala::ExprContext"* %agg_expr_ctx,
+//                         { i8, [7 x i8], double }* %agg_tuple,
+//                         %"class.impala::TupleRow"* %row) #34 {
+//
 // entry:
-//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//     (i64 128241264 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
+//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %agg_expr_ctx,
+//       %"class.impala::TupleRow"* %row)
 //   %0 = extractvalue { i8, double } %src, 0
 //   %is_null = trunc i8 %0 to i1
 //   br i1 %is_null, label %ret, label %src_not_null
 //
 // src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds { i8, double }* %agg_tuple, i32 0, i32 1
-//   call void @SetNotNull({ i8, double }* %agg_tuple)
-//   %dst_val = load double* %dst_slot_ptr
+//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double },
+//       { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2
+//   call void @SetNotNull({ i8, [7 x i8], double }* %agg_tuple)
+//   %dst_val = load double, double* %dst_slot_ptr
 //   %val = extractvalue { i8, double } %src, 1
 //   %1 = fadd double %dst_val, %val
 //   store double %1, double* %dst_slot_ptr
@@ -1434,48 +1452,51 @@ Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
 // }
 //
 // The IR for ndv(double_col) is:
-// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                         { i8, %"struct.impala::StringValue" }* %agg_tuple,
-//                         %"class.impala::TupleRow"* %row) #20 {
+//
+// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+//                         %"class.impala::ExprContext"* %agg_expr_ctx,
+//                         { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple,
+//                         %"class.impala::TupleRow"* %row) #34 {
 // entry:
 //   %dst_lowered_ptr = alloca { i64, i8* }
 //   %src_lowered_ptr = alloca { i8, double }
-//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//     (i64 120530832 to %"class.impala::ExprContext"*), %"class.impala::TupleRow"* %row)
+//   %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %agg_expr_ctx,
+//       %"class.impala::TupleRow"* %row)
 //   %0 = extractvalue { i8, double } %src, 0
 //   %is_null = trunc i8 %0 to i1
 //   br i1 %is_null, label %ret, label %src_not_null
 //
 // src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds
-//     { i8, %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 1
-//   call void @SetNotNull({ i8, %"struct.impala::StringValue" }* %agg_tuple)
-//   %dst_val = load %"struct.impala::StringValue"* %dst_slot_ptr
+//   %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], %"struct.impala::StringValue" },
+//       { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 2
+//   call void @SetNotNull({ i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple)
+//   %dst_val =
+//       load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr
 //   store { i8, double } %src, { i8, double }* %src_lowered_ptr
-//   %src_unlowered_ptr = bitcast { i8, double }* %src_lowered_ptr
-//                        to %"struct.impala_udf::DoubleVal"*
+//   %src_unlowered_ptr =
+//       bitcast { i8, double }* %src_lowered_ptr to %"struct.impala_udf::DoubleVal"*
 //   %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
-//   %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
+//   %dst = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
 //   %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
-//   %1 = extractvalue { i64, i8* } %dst_stringval, 0
+//   %1 = extractvalue { i64, i8* } %dst, 0
 //   %2 = zext i32 %len to i64
 //   %3 = shl i64 %2, 32
 //   %4 = and i64 %1, 4294967295
 //   %5 = or i64 %4, %3
-//   %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0
-//   store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
-//   %dst_unlowered_ptr = bitcast { i64, i8* }* %dst_lowered_ptr
-//                        to %"struct.impala_udf::StringVal"*
-//   call void @HllUpdate(%"class.impala_udf::FunctionContext"* %fn_ctx,
+//   %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0
+//   store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
+//   %dst_unlowered_ptr =
+//       bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
+//   call void @HllUpdate(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
 //                        %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
 //                        %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
-//   %anyval_result = load { i64, i8* }* %dst_lowered_ptr
-//   %6 = extractvalue { i64, i8* } %anyval_result, 1
-//   %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* %6, 0
-//   %8 = extractvalue { i64, i8* } %anyval_result, 0
-//   %9 = ashr i64 %8, 32
-//   %10 = trunc i64 %9 to i32
-//   %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
+//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
+//   %6 = extractvalue { i64, i8* } %anyval_result, 0
+//   %7 = ashr i64 %6, 32
+//   %8 = trunc i64 %7 to i32
+//   %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1
+//   %10 = extractvalue { i64, i8* } %anyval_result, 1
+//   %11 = insertvalue %"struct.impala::StringValue" %9, i8* %10, 0
 //   store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr
 //   br label %ret
 //
@@ -1487,53 +1508,56 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state_->GetCodegen(&codegen));
 
+  // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
+  // with multiple input expressions (e.g. group_concat).
   DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
-  ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0];
-  Expr* input_expr = input_expr_ctx->root();
+  ExprContext* agg_expr_ctx = evaluator->input_expr_ctxs()[0];
+  Expr* agg_expr = agg_expr_ctx->root();
 
   // TODO: implement timestamp
-  if (input_expr->type().type == TYPE_TIMESTAMP &&
+  if (agg_expr->type().type == TYPE_TIMESTAMP &&
       evaluator->agg_op() != AggFnEvaluator::AVG) {
     return Status("PartitionedAggregationNode::CodegenUpdateSlot(): timestamp input type "
         "NYI");
   }
 
   Function* agg_expr_fn;
-  RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(state_, &agg_expr_fn));
+  RETURN_IF_ERROR(agg_expr->GetCodegendComputeFn(state_, &agg_expr_fn));
 
   PointerType* fn_ctx_type =
       codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
+  PointerType* expr_ctx_type = codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME);
   StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
   if (tuple_struct == NULL) {
     return Status("PartitionedAggregationNode::CodegenUpdateSlot(): failed to generate "
         "intermediate tuple desc");
   }
-  PointerType* tuple_ptr_type = PointerType::get(tuple_struct, 0);
+  PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
   PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
 
   // Create UpdateSlot prototype
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctx", fn_ctx_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_expr_ctx", expr_ctx_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
 
   LlvmCodeGen::LlvmBuilder builder(codegen->context());
-  Value* args[3];
+  Value* args[4];
   *fn = prototype.GeneratePrototype(&builder, &args[0]);
-  Value* fn_ctx_arg = args[0];
-  Value* agg_tuple_arg = args[1];
-  Value* row_arg = args[2];
+  Value* agg_fn_ctx_arg = args[0];
+  Value* agg_expr_ctx_arg = args[1];
+  Value* agg_tuple_arg = args[2];
+  Value* row_arg = args[3];
 
   BasicBlock* src_not_null_block =
       BasicBlock::Create(codegen->context(), "src_not_null", *fn);
   BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", *fn);
 
   // Call expr function to get src slot value
-  Value* expr_ctx = codegen->CastPtrToLlvmPtr(
-      codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME), input_expr_ctx);
-  Value* agg_expr_fn_args[] = { expr_ctx, row_arg };
+  Value* agg_expr_fn_args[] = { agg_expr_ctx_arg, row_arg };
   CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(
-      codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
+      codegen, &builder, agg_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
 
   Value* src_is_null = src.GetIsNull();
   builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
@@ -1597,7 +1621,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
       // Clone and replace constants.
       ir_fn = codegen->CloneFunction(ir_fn);
       vector<FunctionContext::TypeDesc> arg_types;
-      arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(input_expr->type()));
+      arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(agg_expr->type()));
       Expr::InlineConstants(AnyValUtil::ColumnTypeToTypeDesc(dst_type), arg_types,
           codegen, ir_fn);
 
@@ -1606,7 +1630,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
           *fn, LlvmCodeGen::NamedVariable("src_lowered_ptr", src.value()->getType()));
       builder.CreateStore(src.value(), src_lowered_ptr);
       Type* unlowered_ptr_type =
-          CodegenAnyVal::GetUnloweredPtrType(codegen, input_expr->type());
+          CodegenAnyVal::GetUnloweredPtrType(codegen, agg_expr->type());
       Value* src_unlowered_ptr =
           builder.CreateBitCast(src_lowered_ptr, unlowered_ptr_type, "src_unlowered_ptr");
 
@@ -1624,7 +1648,7 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
 
       // Call 'ir_fn'
       builder.CreateCall(ir_fn,
-          ArrayRef<Value*>({fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr}));
+          ArrayRef<Value*>({agg_fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr}));
 
       // Convert StringVal intermediate 'dst_arg' back to StringValue
       Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
@@ -1656,28 +1680,41 @@ Status PartitionedAggregationNode::CodegenUpdateSlot(
 // For the query:
 // select count(*), count(int_col), sum(double_col) the IR looks like:
 //
-
 // ; Function Attrs: alwaysinline
 // define void @UpdateTuple(%"class.impala::PartitionedAggregationNode"* %this_ptr,
 //                          %"class.impala_udf::FunctionContext"** %agg_fn_ctxs,
 //                          %"class.impala::Tuple"* %tuple,
 //                          %"class.impala::TupleRow"* %row,
-//                          i1 %is_merge) #20 {
+//                          i1 %is_merge) #34 {
 // entry:
-//   %tuple1 = bitcast %"class.impala::Tuple"* %tuple to { i8, i64, i64, double }*
-//   %src_slot = getelementptr inbounds { i8, i64, i64, double }* %tuple1, i32 0, i32 1
-//   %count_star_val = load i64* %src_slot
+//   %tuple1 =
+//       bitcast %"class.impala::Tuple"* %tuple to { i8, [7 x i8], i64, i64, double }*
+//   %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double },
+//       { i8, [7 x i8], i64, i64, double }* %tuple1, i32 0, i32 2
+//   %count_star_val = load i64, i64* %src_slot
 //   %count_star_inc = add i64 %count_star_val, 1
 //   store i64 %count_star_inc, i64* %src_slot
-//   %0 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1
-//   %fn_ctx = load %"class.impala_udf::FunctionContext"** %0
-//   call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
-//                         { i8, i64, i64, double }* %tuple1,
+//   %0 = getelementptr %"class.impala_udf::FunctionContext"*,
+//       %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 1
+//   %agg_fn_ctx = load %"class.impala_udf::FunctionContext"*,
+//       %"class.impala_udf::FunctionContext"** %0
+//   %1 = call %"class.impala::ExprContext"*
+//       @_ZNK6impala26PartitionedAggregationNode17GetAggExprContextEi(
+//           %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 1)
+//   call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %agg_fn_ctx,
+//                         %"class.impala::ExprContext"* %1,
+//                         { i8, [7 x i8], i64, i64, double }* %tuple1,
 //                         %"class.impala::TupleRow"* %row)
-//   %1 = getelementptr %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2
-//   %fn_ctx2 = load %"class.impala_udf::FunctionContext"** %1
-//   call void @UpdateSlot5(%"class.impala_udf::FunctionContext"* %fn_ctx2,
-//                          { i8, i64, i64, double }* %tuple1,
+//   %2 = getelementptr %"class.impala_udf::FunctionContext"*,
+//       %"class.impala_udf::FunctionContext"** %agg_fn_ctxs, i32 2
+//   %agg_fn_ctx2 = load %"class.impala_udf::FunctionContext"*,
+//       %"class.impala_udf::FunctionContext"** %2
+//   %3 = call %"class.impala::ExprContext"*
+//       @_ZNK6impala26PartitionedAggregationNode17GetAggExprContextEi(
+//           %"class.impala::PartitionedAggregationNode"* %this_ptr, i32 2)
+//   call void @UpdateSlot.3(%"class.impala_udf::FunctionContext"* %agg_fn_ctx2,
+//                          %"class.impala::ExprContext"* %3,
+//                          { i8, [7 x i8], i64, i64, double }* %tuple1,
 //                          %"class.impala::TupleRow"* %row)
 //   ret void
 // }
@@ -1726,13 +1763,13 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   Type* tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
   Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
 
-  PointerType* agg_node_ptr_type = agg_node_type->getPointerTo();
-  PointerType* fn_ctx_ptr_ptr_type = fn_ctx_type->getPointerTo()->getPointerTo();
-  PointerType* tuple_ptr_type = tuple_type->getPointerTo();
-  PointerType* tuple_row_ptr_type = tuple_row_type->getPointerTo();
+  PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
+  PointerType* fn_ctx_ptr_ptr_type = codegen->GetPtrPtrType(fn_ctx_type);
+  PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_type);
+  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
 
   StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
+  PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
   LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_ctxs", fn_ctx_ptr_ptr_type));
@@ -1743,7 +1780,7 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   LlvmCodeGen::LlvmBuilder builder(codegen->context());
   Value* args[5];
   *fn = prototype.GeneratePrototype(&builder, &args[0]);
-
+  Value* this_arg = args[0];
   Value* agg_fn_ctxs_arg = args[1];
   Value* tuple_arg = args[2];
   Value* row_arg = args[3];
@@ -1752,6 +1789,10 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   // TODO: get rid of this by using right type in function signature
   tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
 
+  Function* get_expr_ctx_fn =
+      codegen->GetFunction(IRFunction::PART_AGG_NODE_GET_EXPR_CTX, false);
+  DCHECK(get_expr_ctx_fn != NULL);
+
   // Loop over each expr and generate the IR for that slot.  If the expr is not
   // count(*), generate a helper IR function to update the slot and call that.
   j = grouping_expr_ctxs_.size();
@@ -1770,9 +1811,14 @@ Status PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
     } else {
       Function* update_slot_fn;
       RETURN_IF_ERROR(CodegenUpdateSlot(evaluator, slot_desc, &update_slot_fn));
-      Value* fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
-      Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx");
-      builder.CreateCall(update_slot_fn, ArrayRef<Value*>({fn_ctx, tuple_arg, row_arg}));
+      Value* agg_fn_ctx_ptr = builder.CreateConstGEP1_32(agg_fn_ctxs_arg, i);
+      Value* agg_fn_ctx = builder.CreateLoad(agg_fn_ctx_ptr, "agg_fn_ctx");
+      // Call GetExprCtx() to get the expression context.
+      DCHECK(agg_expr_ctxs_[i] != NULL);
+      Value* get_expr_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
+      Value* agg_expr_ctx = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args);
+      Value* update_slot_args[] = { agg_fn_ctx, agg_expr_ctx, tuple_arg, row_arg };
+      builder.CreateCall(update_slot_fn, update_slot_args);
     }
   }
   builder.CreateRetVoid();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 952dcd7..c766ab2 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -193,11 +193,18 @@ class PartitionedAggregationNode : public ExecNode {
   /// are doing a streaming preaggregation.
   bool is_streaming_preagg_;
 
-  /// Contains any evaluators that require the serialize step.
+  /// True if any of the evaluators require the serialize step.
   bool needs_serialize_;
 
+  /// The list of all aggregate operations for this exec node.
   std::vector<AggFnEvaluator*> aggregate_evaluators_;
 
+  /// Cache of the ExprContexts of 'aggregate_evaluators_'. Used in the codegen'ed
+  /// version of UpdateTuple() to avoid loading aggregate_evaluators_[i] at runtime.
+  /// An entry is NULL if the aggregate evaluator is not codegen'ed or there is no Expr
+  /// in the aggregate evaluator (e.g. count(*)).
+  std::vector<ExprContext*> agg_expr_ctxs_;
+
   /// FunctionContext for each aggregate function and backing MemPool. String data
   /// returned by the aggregate functions is allocated via these contexts.
   /// These contexts are only passed to the evaluators in the non-partitioned
@@ -468,9 +475,9 @@ class PartitionedAggregationNode : public ExecNode {
   /// belonging to the same partition independent of whether the agg fn evaluators have
   /// is_merge() == true.
   /// This function is replaced by codegen (which is why we don't use a vector argument
-  /// for agg_fn_ctxs). Any var-len data is allocated from the FunctionContexts.
+  /// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts.
   void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row,
-                   bool is_merge = false);
+      bool is_merge = false);
 
   /// Called on the intermediate tuple of each group after all input rows have been
   /// consumed and aggregated. Computes the final aggregate values to be returned in
@@ -482,7 +489,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// TODO: Coordinate the allocation of new tuples with the release of memory
   /// so as not to make memory consumption blow up.
   Tuple* GetOutputTuple(const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
-                        Tuple* tuple, MemPool* pool);
+      Tuple* tuple, MemPool* pool);
 
   /// Do the aggregation for all tuple rows in the batch when there is no grouping.
   /// This function is replaced by codegen.
@@ -517,6 +524,10 @@ class PartitionedAggregationNode : public ExecNode {
   template<bool AGGREGATED_ROWS>
   Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx);
 
+  /// Accessor for the expression context of an AggFnEvaluator. Used only in codegen'ed
+  /// version of UpdateTuple().
+  ExprContext* IR_ALWAYS_INLINE GetAggExprContext(int i) const;
+
   /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is
   /// the context for the partition's hash table and hash is the precomputed hash of
   /// the row. The row can be an unaggregated or aggregated row depending on

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6cc296ec/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index fb39789..cde969c 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -189,6 +189,8 @@ class AggFnEvaluator {
   /// intermediate_slot_desc_ if this agg fn has the same intermediate and output type.
   const SlotDescriptor* output_slot_desc_;
 
+  /// Expression contexts for this AggFnEvaluator. Empty if there is no
+  /// expression (e.g. count(*)).
   std::vector<ExprContext*> input_expr_ctxs_;
 
   /// The enum for some of the builtins that still require special cased logic.
@@ -270,7 +272,7 @@ inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators,
   }
 }
 inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Add(fn_ctxs[i], src, dst);