You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/02 06:28:28 UTC

[5/6] incubator-impala git commit: IMPALA-4674: Part 1: remove old aggs and joins

IMPALA-4674: Part 1: remove old aggs and joins

This is intended to be merged at the same time as Part 2 but is
separated out to make the change more reviewable. Part 2 assumes
that it does not need special logic to handle this mode (e.g.
because the old aggs and joins don't use reservation).

Disable the --enable_partitioned_{aggregation,hash_join} options
and remove all product and test code associated with them.

Change-Id: I5ce2236d37c0ced188a4a81f7e00d4b8ac98e7e9
Reviewed-on: http://gerrit.cloudera.org:8080/7102
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/507bd8be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/507bd8be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/507bd8be

Branch: refs/heads/master
Commit: 507bd8be7e5e710b37f845ef6cf23238b426ee0a
Parents: 344c26a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jun 6 10:45:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 2 01:49:12 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |  18 -
 be/src/codegen/impala-ir.cc                     |   3 -
 be/src/exec/CMakeLists.txt                      |   7 -
 be/src/exec/aggregation-node-ir.cc              |  55 --
 be/src/exec/aggregation-node.cc                 | 878 -------------------
 be/src/exec/aggregation-node.h                  | 174 ----
 be/src/exec/blocking-join-node.cc               |  20 +-
 be/src/exec/blocking-join-node.h                |  17 +-
 be/src/exec/exec-node.cc                        |  32 +-
 be/src/exec/hash-join-node-ir.cc                | 140 ---
 be/src/exec/hash-join-node.cc                   | 673 --------------
 be/src/exec/hash-join-node.h                    | 164 ----
 be/src/exec/hdfs-parquet-scanner.cc             |   2 -
 be/src/exec/nested-loop-join-node.cc            |   5 -
 be/src/exec/nested-loop-join-node.h             |   3 -
 be/src/exec/old-hash-table-ir.cc                |  42 -
 be/src/exec/old-hash-table-test.cc              | 337 -------
 be/src/exec/old-hash-table.cc                   | 872 ------------------
 be/src/exec/old-hash-table.h                    | 548 ------------
 be/src/exec/old-hash-table.inline.h             | 189 ----
 be/src/exec/partitioned-hash-join-node.cc       |   5 -
 be/src/exec/partitioned-hash-join-node.h        |   1 -
 be/src/exprs/agg-fn-evaluator.cc                |   1 -
 be/src/runtime/row-batch.cc                     |  51 +-
 be/src/runtime/row-batch.h                      |  17 +-
 be/src/util/backend-gflag-util.cc               |   2 -
 common/thrift/BackendGflags.thrift              |   6 +-
 .../org/apache/impala/planner/JoinNode.java     |   3 +-
 .../apache/impala/service/BackendConfig.java    |   4 -
 .../queries/QueryTest/legacy-joins-aggs.test    |  45 -
 tests/common/environ.py                         |   6 -
 tests/common/skip.py                            |  12 +-
 tests/custom_cluster/test_legacy_joins_aggs.py  |  33 -
 tests/metadata/test_ddl.py                      |   3 +-
 tests/query_test/test_aggregation.py            |  14 +-
 tests/query_test/test_join_queries.py           |   2 -
 tests/query_test/test_mt_dop.py                 |   2 -
 tests/query_test/test_nested_types.py           |   4 -
 tests/query_test/test_runtime_filters.py        |   4 +-
 tests/query_test/test_scanners.py               |   3 -
 tests/query_test/test_tpch_nested_queries.py    |   2 -
 41 files changed, 36 insertions(+), 4363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index ac36bb5..94dc86a 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -43,12 +43,6 @@ options, args = parser.parse_args()
 # the bit code module.
 # TODO: should we work out the mangling rules?
 ir_functions = [
-  ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING",
-   "_ZN6impala15AggregationNode27ProcessRowBatchWithGroupingEPNS_8RowBatchE"],
-  ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING",
-   "_ZN6impala15AggregationNode25ProcessRowBatchNoGroupingEPNS_8RowBatchE"],
-  ["AGG_NODE_GET_AGG_FN_EVALUATORS",
-   "_ZNK6impala15AggregationNode12agg_fn_evalsEv"],
   ["AGG_FN_EVALUATOR_INPUT_EVALUATORS",
    "_ZNK6impala14AggFnEvaluator11input_evalsEv"],
   ["AGG_FN_EVALUATOR_AGG_FN_CTX",
@@ -104,18 +98,6 @@ ir_functions = [
   ["HASH_CRC", "IrCrcHash"],
   ["HASH_FNV", "IrFnvHash"],
   ["HASH_MURMUR", "IrMurmurHash"],
-  ["HASH_JOIN_PROCESS_BUILD_BATCH",
-   "_ZN6impala12HashJoinNode17ProcessBuildBatchEPNS_8RowBatchE"],
-  ["HASH_JOIN_PROCESS_PROBE_BATCH",
-   "_ZN6impala12HashJoinNode17ProcessProbeBatchEPNS_8RowBatchES2_i"],
-  ["OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS",
-   "_ZNK6impala12OldHashTable16build_expr_evalsEv"],
-  ["OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS",
-   "_ZNK6impala12OldHashTable16probe_expr_evalsEv"],
-  ["OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER",
-   "_ZNK6impala12OldHashTable18expr_values_bufferEv"],
-  ["OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS",
-   "_ZNK6impala12OldHashTable20expr_value_null_bitsEv"],
   ["PHJ_PROCESS_BUILD_BATCH",
    "_ZN6impala10PhjBuilder17ProcessBuildBatchEPNS_8RowBatchEPNS_12HashTableCtxEb"],
   ["PHJ_PROCESS_PROBE_BATCH_INNER_JOIN",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2500a33..2992849 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -26,13 +26,10 @@
 #pragma clang diagnostic ignored "-Wheader-hygiene"
 
 #include "codegen/codegen-anyval-ir.cc"
-#include "exec/aggregation-node-ir.cc"
-#include "exec/hash-join-node-ir.cc"
 #include "exec/hash-table-ir.cc"
 #include "exec/hdfs-avro-scanner-ir.cc"
 #include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
-#include "exec/old-hash-table-ir.cc"
 #include "exec/partitioned-aggregation-node-ir.cc"
 #include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7d86f1c..a94a38d 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,8 +25,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 
 add_library(Exec
-  aggregation-node.cc
-  aggregation-node-ir.cc
   analytic-eval-node.cc
   base-sequence-scanner.cc
   blocking-join-node.cc
@@ -39,10 +37,6 @@ add_library(Exec
   exchange-node.cc
   external-data-source-executor.cc
   filter-context.cc
-  hash-join-node.cc
-  hash-join-node-ir.cc
-  old-hash-table.cc
-  old-hash-table-ir.cc
   hash-table.cc
   hbase-table-sink.cc
   hbase-table-writer.cc
@@ -105,7 +99,6 @@ add_library(Exec
 add_dependencies(Exec thrift-deps)
 
 ADD_BE_TEST(zigzag-test)
-ADD_BE_TEST(old-hash-table-test)
 ADD_BE_TEST(hash-table-test)
 ADD_BE_TEST(delimited-text-parser-test)
 ADD_BE_TEST(read-write-util-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-ir.cc b/be/src/exec/aggregation-node-ir.cc
deleted file mode 100644
index af13b68..0000000
--- a/be/src/exec/aggregation-node-ir.cc
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/aggregation-node.h"
-
-#include "exec/old-hash-table.inline.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple-row.h"
-
-using namespace impala;
-
-// Functions in this file are cross compiled to IR with clang.  These functions
-// are modified at runtime with a query specific codegen'd UpdateAggTuple
-
-AggFnEvaluator* const* AggregationNode::agg_fn_evals() const {
-  return agg_fn_evals_.data();
-}
-
-void AggregationNode::ProcessRowBatchNoGrouping(RowBatch* batch) {
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    UpdateTuple(singleton_intermediate_tuple_, batch->GetRow(i));
-  }
-}
-
-void AggregationNode::ProcessRowBatchWithGrouping(RowBatch* batch) {
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    TupleRow* row = batch->GetRow(i);
-    Tuple* agg_tuple = NULL;
-    OldHashTable::Iterator it = hash_tbl_->Find(row);
-    if (it.AtEnd()) {
-      agg_tuple = ConstructIntermediateTuple();
-      hash_tbl_->Insert(agg_tuple);
-    } else {
-      agg_tuple = it.GetTuple();
-    }
-    UpdateTuple(agg_tuple, row);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
deleted file mode 100644
index 13f7dd3..0000000
--- a/be/src/exec/aggregation-node.cc
+++ /dev/null
@@ -1,878 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/aggregation-node.h"
-
-#include <math.h>
-#include <sstream>
-#include <boost/functional/hash.hpp>
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include <x86intrin.h>
-
-#include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/agg-fn-evaluator.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem-pool.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple-row.h"
-#include "udf/udf-internal.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/Exprs_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace llvm;
-
-namespace impala {
-
-const char* AggregationNode::LLVM_CLASS_NAME = "class.impala::AggregationNode";
-
-// TODO: pass in maximum size; enforce by setting limit in mempool
-AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
-    intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
-    intermediate_row_desc_(pool->Add(new RowDescriptor(intermediate_tuple_desc_, false))),
-    output_tuple_id_(tnode.agg_node.output_tuple_id),
-    output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
-    singleton_intermediate_tuple_(nullptr),
-    codegen_process_row_batch_fn_(nullptr),
-    process_row_batch_fn_(nullptr),
-    needs_finalize_(tnode.agg_node.need_finalize),
-    build_timer_(nullptr),
-    get_results_timer_(nullptr),
-    hash_table_buckets_counter_(nullptr) {
-  DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
-}
-
-Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  DCHECK(intermediate_tuple_desc_ != nullptr);
-  DCHECK(output_tuple_desc_ != nullptr);
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-
-  const RowDescriptor& row_desc = *child(0)->row_desc();
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc, state,
-      &grouping_exprs_));
-  for (int i = 0; i < grouping_exprs_.size(); ++i) {
-    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
-    DCHECK(desc->type().type == TYPE_NULL ||
-        desc->type() == grouping_exprs_[i]->type());
-    // TODO: Generate the build exprs in the FE such that the existing logic
-    // for handling NULL_TYPE works.
-    SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ?
-        new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
-    build_exprs_.push_back(build_expr);
-    RETURN_IF_ERROR(build_expr->Init(*intermediate_row_desc_, state));
-  }
-
-  int j = grouping_exprs_.size();
-  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
-    SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
-    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
-    AggFn* agg_fn;
-    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc,
-        *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
-    agg_fns_.push_back(agg_fn);
-  }
-  return Status::OK();
-}
-
-Status AggregationNode::Prepare(RuntimeState* state) {
-  DCHECK(output_iterator_.AtEnd());
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Prepare(state));
-
-  tuple_pool_.reset(new MemPool(mem_tracker()));
-  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
-  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
-  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
-  hash_table_buckets_counter_ =
-      ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
-  hash_table_load_factor_counter_ =
-      ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
-  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, agg_fn_pool_.get(),
-      &agg_fn_evals_));
-  DCHECK_EQ(agg_fns_.size(), agg_fn_evals_.size());
-
-  // TODO: how many buckets?
-  vector<ScalarExpr*>* filter_exprs = pool_->Add(new vector<ScalarExpr*>());
-  RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, grouping_exprs_,
-      *filter_exprs, 1, true, vector<bool>(build_exprs_.size(), true),
-      id(), mem_tracker(), vector<RuntimeFilter*>(), &hash_tbl_, true));
-  AddCodegenDisabledMessage(state);
-  return Status::OK();
-}
-
-void AggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
-  if (IsNodeCodegenDisabled()) return;
-
-  bool codegen_enabled = false;
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != nullptr);
-  Function* update_tuple_fn = CodegenUpdateTuple(codegen);
-  if (update_tuple_fn != nullptr) {
-    codegen_process_row_batch_fn_ = CodegenProcessRowBatch(codegen, update_tuple_fn);
-    if (codegen_process_row_batch_fn_ != nullptr) {
-      // Update to using codegen'd process row batch.
-      codegen->AddFunctionToJit(codegen_process_row_batch_fn_,
-          reinterpret_cast<void**>(&process_row_batch_fn_));
-      codegen_enabled = true;
-    }
-  }
-  runtime_profile()->AddCodegenMsg(codegen_enabled);
-}
-
-Status AggregationNode::Open(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(hash_tbl_->Open(state));
-  RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
-
-  if (grouping_exprs_.empty()) {
-    // Create single intermediate tuple. This must happen after
-    // opening the aggregate evaluators.
-    singleton_intermediate_tuple_ = ConstructIntermediateTuple();
-    // Check for failures during AggFnEvaluator::Init().
-    RETURN_IF_ERROR(state->GetQueryStatus());
-    hash_tbl_->Insert(singleton_intermediate_tuple_);
-  }
-
-  RETURN_IF_ERROR(children_[0]->Open(state));
-
-  RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
-  int64_t num_input_rows = 0;
-  while (true) {
-    bool eos;
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
-    SCOPED_TIMER(build_timer_);
-
-    if (VLOG_ROW_IS_ON) {
-      for (int i = 0; i < batch.num_rows(); ++i) {
-        TupleRow* row = batch.GetRow(i);
-        VLOG_ROW << "input row: " << PrintRow(row, *children_[0]->row_desc());
-      }
-    }
-    if (process_row_batch_fn_ != nullptr) {
-      process_row_batch_fn_(this, &batch);
-    } else if (grouping_exprs_.empty()) {
-      ProcessRowBatchNoGrouping(&batch);
-    } else {
-      ProcessRowBatchWithGrouping(&batch);
-    }
-    COUNTER_SET(hash_table_buckets_counter_, hash_tbl_->num_buckets());
-    COUNTER_SET(hash_table_load_factor_counter_, hash_tbl_->load_factor());
-    num_input_rows += batch.num_rows();
-    // We must set output_iterator_ here, rather than outside the loop, because
-    // output_iterator_ must be set if the function returns within the loop
-    output_iterator_ = hash_tbl_->Begin();
-
-    batch.Reset();
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    if (eos) break;
-  }
-
-  // We have consumed all of the input from the child and transfered ownership of the
-  // resources we need, so the child can be closed safely to release its resources.
-  child(0)->Close(state);
-  VLOG_FILE << "aggregated " << num_input_rows << " input rows into "
-            << hash_tbl_->size() << " output rows";
-  return Status::OK();
-}
-
-Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
-  SCOPED_TIMER(get_results_timer_);
-
-  if (ReachedLimit()) {
-    *eos = true;
-    return Status::OK();
-  }
-  *eos = false;
-  ScalarExprEvaluator* const* evals = conjunct_evals_.data();
-  int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  int count = 0;
-  const int N = state->batch_size();
-  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
-    // This loop can go on for a long time if the conjuncts are very selective. Do query
-    // maintenance every N iterations.
-    if (count++ % N == 0) {
-      RETURN_IF_CANCELLED(state);
-      RETURN_IF_ERROR(QueryMaintenance(state));
-    }
-    int row_idx = row_batch->AddRow();
-    TupleRow* row = row_batch->GetRow(row_idx);
-    Tuple* intermediate_tuple = output_iterator_.GetTuple();
-    Tuple* output_tuple = FinalizeTuple(intermediate_tuple, row_batch->tuple_data_pool());
-    output_iterator_.Next<false>();
-    row->SetTuple(0, output_tuple);
-    if (ExecNode::EvalConjuncts(evals, num_conjuncts, row)) {
-      VLOG_ROW << "output row: " << PrintRow(row, *row_desc());
-      row_batch->CommitLastRow();
-      ++num_rows_returned_;
-      if (ReachedLimit()) break;
-    }
-  }
-  *eos = output_iterator_.AtEnd() || ReachedLimit();
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-  return Status::OK();
-}
-
-Status AggregationNode::Reset(RuntimeState* state) {
-  DCHECK(false) << "NYI";
-  return Status("NYI");
-}
-
-void AggregationNode::Close(RuntimeState* state) {
-  if (is_closed()) return;
-
-  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
-  // them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
-  // but we don't actually need the result, so allocate a single dummy tuple to avoid
-  // accumulating memory.
-  Tuple* dummy_dst = nullptr;
-  // 'tuple_pool_' can be NULL if Prepare() failed.
-  if (needs_finalize_ && tuple_pool_.get() != nullptr) {
-    dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), tuple_pool_.get());
-  }
-  while (!output_iterator_.AtEnd()) {
-    Tuple* tuple = output_iterator_.GetTuple();
-    if (needs_finalize_) {
-      AggFnEvaluator::Finalize(agg_fn_evals_, tuple, dummy_dst);
-    } else {
-      AggFnEvaluator::Serialize(agg_fn_evals_, tuple);
-    }
-    output_iterator_.Next<false>();
-  }
-
-  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
-  if (hash_tbl_.get() != nullptr) hash_tbl_->Close(state);
-
-  AggFnEvaluator::Close(agg_fn_evals_, state);
-  agg_fn_evals_.clear();
-  AggFn::Close(agg_fns_);
-  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll();
-
-  ScalarExpr::Close(grouping_exprs_);
-  ScalarExpr::Close(build_exprs_);
-  ExecNode::Close(state);
-}
-
-Status AggregationNode::QueryMaintenance(RuntimeState* state) {
-  if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
-Tuple* AggregationNode::ConstructIntermediateTuple() {
-  Tuple* intermediate_tuple = Tuple::Create(
-      intermediate_tuple_desc_->byte_size(), tuple_pool_.get());
-  vector<SlotDescriptor*>::const_iterator slot_desc =
-      intermediate_tuple_desc_->slots().begin();
-
-  // copy grouping values
-  for (int i = 0; i < grouping_exprs_.size(); ++i, ++slot_desc) {
-    if (hash_tbl_->last_expr_value_null(i)) {
-      intermediate_tuple->SetNull((*slot_desc)->null_indicator_offset());
-    } else {
-      void* src = hash_tbl_->last_expr_value(i);
-      void* dst = intermediate_tuple->GetSlot((*slot_desc)->tuple_offset());
-      RawValue::Write(src, dst, (*slot_desc)->type(), tuple_pool_.get());
-    }
-  }
-
-  // Initialize aggregate output.
-  DCHECK_EQ(agg_fns_.size(), agg_fn_evals_.size());
-  for (int i = 0; i < agg_fns_.size(); ++i, ++slot_desc) {
-    AggFnEvaluator* eval = agg_fn_evals_[i];
-    eval->Init(intermediate_tuple);
-    // Codegen specific path.
-    // To minimize branching on the UpdateTuple path, initialize the result value
-    // so that UpdateTuple doesn't have to check if the aggregation
-    // dst slot is null.
-    //  - sum/count: 0
-    //  - min: max_value
-    //  - max: min_value
-    // TODO: remove when we don't use the irbuilder for codegen here.
-    // This optimization no longer applies with AnyVal
-    if ((*slot_desc)->type().type != TYPE_STRING &&
-        (*slot_desc)->type().type != TYPE_VARCHAR &&
-        (*slot_desc)->type().type != TYPE_TIMESTAMP &&
-        (*slot_desc)->type().type != TYPE_CHAR &&
-        (*slot_desc)->type().type != TYPE_DECIMAL) {
-      ExprValue default_value;
-      void* default_value_ptr = nullptr;
-      switch (agg_fns_[i]->agg_op()) {
-        case AggFn::MIN:
-          default_value_ptr = default_value.SetToMax((*slot_desc)->type());
-          RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
-          break;
-        case AggFn::MAX:
-          default_value_ptr = default_value.SetToMin((*slot_desc)->type());
-          RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
-          break;
-        default:
-          break;
-      }
-    }
-  }
-  return intermediate_tuple;
-}
-
-void AggregationNode::UpdateTuple(Tuple* tuple, TupleRow* row) {
-  DCHECK(tuple != nullptr || agg_fn_evals_.empty());
-  AggFnEvaluator::Add(agg_fn_evals_, row, tuple);
-}
-
-Tuple* AggregationNode::FinalizeTuple(Tuple* tuple, MemPool* pool) {
-  DCHECK(tuple != nullptr || agg_fn_evals_.empty());
-  DCHECK(output_tuple_desc_ != nullptr);
-
-  Tuple* dst = tuple;
-  if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
-    dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
-  }
-  if (needs_finalize_) {
-    AggFnEvaluator::Finalize(agg_fn_evals_, tuple, dst);
-  } else {
-    AggFnEvaluator::Serialize(agg_fn_evals_, tuple);
-  }
-  // Copy grouping values from tuple to dst.
-  // TODO: Codegen this.
-  if (dst != tuple) {
-    int num_grouping_slots = grouping_exprs_.size();
-    for (int i = 0; i < num_grouping_slots; ++i) {
-      SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
-      SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
-      bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
-      void* src_slot = nullptr;
-      if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
-      RawValue::Write(src_slot, dst, dst_slot_desc, nullptr);
-    }
-  }
-  return dst;
-}
-
-void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
-  *out << string(indentation_level * 2, ' ');
-  *out << "AggregationNode("
-       << "intermediate_tuple_id=" << intermediate_tuple_id_
-       << " output_tuple_id=" << output_tuple_id_
-       << " needs_finalize=" << needs_finalize_
-       << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
-       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
-  ExecNode::DebugString(indentation_level, out);
-  *out << ")";
-}
-
-IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
-  switch (type.type) {
-    case TYPE_BOOLEAN: return IRFunction::HLL_UPDATE_BOOLEAN;
-    case TYPE_TINYINT: return IRFunction::HLL_UPDATE_TINYINT;
-    case TYPE_SMALLINT: return IRFunction::HLL_UPDATE_SMALLINT;
-    case TYPE_INT: return IRFunction::HLL_UPDATE_INT;
-    case TYPE_BIGINT: return IRFunction::HLL_UPDATE_BIGINT;
-    case TYPE_FLOAT: return IRFunction::HLL_UPDATE_FLOAT;
-    case TYPE_DOUBLE: return IRFunction::HLL_UPDATE_DOUBLE;
-    case TYPE_STRING: return IRFunction::HLL_UPDATE_STRING;
-    case TYPE_DECIMAL: return IRFunction::HLL_UPDATE_DECIMAL;
-    default:
-      DCHECK(false) << "Unsupported type: " << type;
-      return IRFunction::FN_END;
-  }
-}
-
-// IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(FunctionContext* fn_ctx, ScalarExprEvaluator* expr_eval,
-//     AggTuple* agg_tuple, char** row)
-//
-// The IR for sum(double_col) is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//     <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #32 {
-// entry:
-//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %0
-//   %src = call { i8, double } @GetSlotRef(
-//       %"class.impala::ScalarExprEvaluator"* %input_eval,
-//       %"class.impala::TupleRow"* %row)
-//   %1 = extractvalue { i8, double } %src, 0
-//   %is_null = trunc i8 %1 to i1
-//   br i1 %is_null, label %ret, label %src_not_null
-//
-// src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
-//       <{ double, i8 }>* %agg_tuple, i32 0, i32 0
-//   %2 = bitcast <{ double, i8 }>* %agg_tuple to i8*
-//   %null_byte_ptr = getelementptr inbounds i8, i8* %2, i32 8
-//   %null_byte = load i8, i8* %null_byte_ptr
-//   %null_bit_cleared = and i8 %null_byte, -2
-//   store i8 %null_bit_cleared, i8* %null_byte_ptr
-//   %dst_val = load double, double* %dst_slot_ptr
-//   %val = extractvalue { i8, double } %src, 1
-//   %3 = fadd double %dst_val, %val
-//   store double %3, double* %dst_slot_ptr
-//   br label %ret
-//
-// ret:                                              ; preds = %src_not_null, %entry
-//   ret void
-// }
-//
-// The IR for ndv(double_col) is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//     <{ %"struct.impala::StringValue" }>* %agg_tuple,
-//     %"class.impala::TupleRow"* %row) #32 {
-// entry:
-//   %dst_lowered_ptr = alloca { i64, i8* }
-//   %0 = alloca { i8, double }
-//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %1
-//   %src = call { i8, double } @GetSlotRef(
-//       %"class.impala::ScalarExprEvaluator"* %input_eval,
-//           %"class.impala::TupleRow"* %row)
-//   %2 = extractvalue { i8, double } %src, 0
-//   %is_null = trunc i8 %2 to i1
-//   br i1 %is_null, label %ret, label %src_not_null
-//
-// src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds <{ %"struct.impala::StringValue" }>,
-//       <{ %"struct.impala::StringValue" }>* %agg_tuple, i32 0, i32 0
-//   %dst_val =
-//       load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr
-//   store { i8, double } %src, { i8, double }* %0
-//   %src_unlowered_ptr = bitcast { i8, double }* %0 to %"struct.impala_udf::DoubleVal"*
-//   %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
-//   %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
-//   %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
-//   %3 = extractvalue { i64, i8* } %dst_stringval, 0
-//   %4 = zext i32 %len to i64
-//   %5 = shl i64 %4, 32
-//   %6 = and i64 %3, 4294967295
-//   %7 = or i64 %6, %5
-//   %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %7, 0
-//   store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
-//   %dst_unlowered_ptr =
-//       bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
-//   %agg_fn_ctx_arg = call %"class.impala_udf::FunctionContext"*
-//       @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
-//            %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   call void
-//       @_ZN6impala18AggregateFunctions9HllUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE(
-//           %"class.impala_udf::FunctionContext"* %agg_fn_ctx_arg,
-//           %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
-//           %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
-//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
-//   %8 = extractvalue { i64, i8* } %anyval_result, 0
-//   %9 = ashr i64 %8, 32
-//   %10 = trunc i64 %9 to i32
-//   %11 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %10, 1
-//   %12 = extractvalue { i64, i8* } %anyval_result, 1
-//   %13 = insertvalue %"struct.impala::StringValue" %11, i8* %12, 0
-//   store %"struct.impala::StringValue" %13, %"struct.impala::StringValue"* %dst_slot_ptr
-//   br label %ret
-//
-// ret:                                              ; preds = %src_not_null, %entry
-//   ret void
-// }
-//
-llvm::Function* AggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
-    int agg_fn_idx, SlotDescriptor* slot_desc) {
-  AggFn* agg_fn = agg_fns_[agg_fn_idx];
-  ScalarExpr* input_expr = agg_fn->GetChild(0);
-  // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
-  // with multiple input expressions (e.g. group_concat).
-  DCHECK_EQ(agg_fn->GetNumChildren(), 1);
-  // TODO: implement timestamp
-  if (input_expr->type().type == TYPE_TIMESTAMP) return nullptr;
-
-  // Codegen the input expression's GetValue() function.
-  llvm::Function* input_expr_fn;
-  Status status = input_expr->GetCodegendComputeFn(codegen, &input_expr_fn);
-  if (!status.ok()) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
-    return nullptr;
-  }
-  DCHECK(input_expr_fn != nullptr);
-
-  // Create the types of the UpdateSlot()'s arguments.
-  PointerType* agg_fn_eval_type =
-      codegen->GetPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
-  StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  if (tuple_struct == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
-    return nullptr;
-  }
-  PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
-  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
-
-  // Create UpdateSlot() prototype
-  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
-  prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  LlvmBuilder builder(codegen->context());
-  Value* args[3];
-  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
-  Value* agg_fn_eval_arg = args[0];
-  Value* agg_tuple_arg = args[1];
-  Value* row_arg = args[2];
-
-  BasicBlock* src_not_null_block =
-      BasicBlock::Create(codegen->context(), "src_not_null", fn);
-  BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
-
-  // Get the first input expression's evaluator. This assumes there is only one
-  // input to the agg_fn. See DCHECK at the beginning of this function for it.
-  Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
-      IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
-      "input_evals_vector");
-  Value* input_eval =
-      codegen->CodegenArrayAt(&builder, input_evals_vector, 0, "input_eval");
-
-  // Call expr function to get src slot value
-  CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-      input_expr->type(), input_expr_fn, {input_eval, row_arg}, "src");
-
-  Value* src_is_null = src.GetIsNull();
-  builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
-
-  // Src slot is not null, update dst_slot
-  builder.SetInsertPoint(src_not_null_block);
-  Value* dst_ptr = builder.CreateStructGEP(nullptr, agg_tuple_arg,
-      slot_desc->llvm_field_idx(), "dst_slot_ptr");
-  Value* result = nullptr;
-
-  if (slot_desc->is_nullable()) {
-    // Dst is nullptr, just update dst slot to src slot and clear null bit
-    slot_desc->CodegenSetNullIndicator(
-        codegen, &builder, agg_tuple_arg, codegen->false_value());
-  }
-
-  // Update the slot
-  Value* dst_value = builder.CreateLoad(dst_ptr, "dst_val");
-  switch (agg_fn->agg_op()) {
-    case AggFn::COUNT:
-      if (agg_fn->is_merge()) {
-        result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
-      } else {
-        result = builder.CreateAdd(dst_value,
-            codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
-      }
-      break;
-    case AggFn::MIN: {
-      Function* min_fn = codegen->CodegenMinMax(slot_desc->type(), true);
-      Value* min_args[] = { dst_value, src.GetVal() };
-      result = builder.CreateCall(min_fn, min_args, "min_value");
-      break;
-    }
-    case AggFn::MAX: {
-      Function* max_fn = codegen->CodegenMinMax(slot_desc->type(), false);
-      Value* max_args[] = { dst_value, src.GetVal() };
-      result = builder.CreateCall(max_fn, max_args, "max_value");
-      break;
-    }
-    case AggFn::SUM:
-      if (slot_desc->type().type == TYPE_FLOAT || slot_desc->type().type == TYPE_DOUBLE) {
-        result = builder.CreateFAdd(dst_value, src.GetVal());
-      } else {
-        result = builder.CreateAdd(dst_value, src.GetVal());
-      }
-      break;
-    case AggFn::NDV: {
-      DCHECK_EQ(slot_desc->type().type, TYPE_STRING);
-      IRFunction::Type ir_function_type = agg_fn->is_merge() ? IRFunction::HLL_MERGE
-                                          : GetHllUpdateFunction2(input_expr->type());
-      Function* hll_fn = codegen->GetFunction(ir_function_type, false);
-
-      // Create pointer to src_anyval to pass to HllUpdate() function. We must use the
-      // unlowered type.
-      Value* src_unlowered_ptr = src.GetUnloweredPtr("src_unlowered_ptr");
-
-      // Create StringVal* intermediate argument from dst_value
-      CodegenAnyVal dst_stringval =
-          CodegenAnyVal::GetNonNullVal(codegen, &builder, TYPE_STRING, "dst_stringval");
-      dst_stringval.SetFromRawValue(dst_value);
-
-      // Create pointer to dst_stringval to pass to HllUpdate() function. We must use
-      // the unlowered type.
-      Value* dst_lowered_ptr = dst_stringval.GetLoweredPtr("dst_lowered_ptr");
-      Type* dst_unlowered_ptr_type =
-          codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, TYPE_STRING));
-      Value* dst_unlowered_ptr = builder.CreateBitCast(
-          dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
-
-      // Get the FunctionContext object for the AggFnEvaluator.
-      Value* agg_fn_ctx_arg = codegen->CodegenCallFunction(&builder,
-          IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, agg_fn_eval_arg,
-          "agg_fn_ctx_arg");
-
-      // Call 'hll_fn'
-      builder.CreateCall(hll_fn, {agg_fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr});
-
-      // Convert StringVal intermediate 'dst_arg' back to StringValue
-      Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
-      result = CodegenAnyVal(codegen, &builder, TYPE_STRING, anyval_result)
-               .ToNativeValue();
-      break;
-    }
-    default:
-      DCHECK(false) << "bad aggregate operator: " << agg_fn->agg_op();
-  }
-
-  builder.CreateStore(result, dst_ptr);
-  builder.CreateBr(ret_block);
-
-  builder.SetInsertPoint(ret_block);
-  builder.CreateRetVoid();
-
-  return codegen->FinalizeFunction(fn);
-}
-
-// IR codegen for the UpdateTuple loop.  This loop is query specific and
-// based on the aggregate functions.  The function signature must match the non-
-// codegen'd UpdateTuple exactly.
-// For the query:
-// select count(*), count(int_col), sum(double_col) the IR looks like:
-//
-// define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr,
-//     %"class.impala::Tuple"* %agg_tuple, %"class.impala::TupleRow"* %tuple_row) #32 {
-// entry:
-//   %tuple = bitcast %"class.impala::Tuple"* %agg_tuple to <{ i64, i64, double, i8 }>*
-//   %agg_fn_evals = call %"class.impala::AggFnEvaluator"**
-//       @_ZNK6impala15AggregationNode12agg_fn_evalsEv(
-//           %"class.impala::AggregationNode"* %this_ptr)
-//   %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
-//       <{ i64, i64, double, i8 }>* %tuple, i32 0, i32 0
-//   %count_star_val = load i64, i64* %src_slot
-//   %count_star_inc = add i64 %count_star_val, 1
-//   store i64 %count_star_inc, i64* %src_slot
-//   %0 = getelementptr %"class.impala::AggFnEvaluator"*,
-//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
-//   %agg_fn_eval =
-//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
-//   call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//       <{ i64, i64, double, i8 }>* %tuple, %"class.impala::TupleRow"* %tuple_row)
-//   %1 = getelementptr %"class.impala::AggFnEvaluator"*,
-//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
-//   %agg_fn_eval1 =
-//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
-//   call void @UpdateSlot.3(%"class.impala::AggFnEvaluator"* %agg_fn_eval1,
-//       <{ i64, i64, double, i8 }>* %tuple, %"class.impala::TupleRow"* %tuple_row)
-//   ret void
-// }
-//
-Function* AggregationNode::CodegenUpdateTuple(LlvmCodeGen* codegen) {
-  SCOPED_TIMER(codegen->codegen_timer());
-
-  int j = grouping_exprs_.size();
-  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFn* agg_fn = agg_fns_[i];
-
-    // Timestamp and char are never supported. NDV supports decimal and string but no
-    // other functions.
-    // TODO: the other aggregate functions might work with decimal as-is
-    if (slot_desc->type().type == TYPE_TIMESTAMP || slot_desc->type().type == TYPE_CHAR ||
-        (agg_fn->agg_op() != AggFn::NDV &&
-         (slot_desc->type().type == TYPE_DECIMAL ||
-          slot_desc->type().type == TYPE_STRING ||
-          slot_desc->type().type == TYPE_VARCHAR))) {
-      VLOG_QUERY << "Could not codegen UpdateIntermediateTuple because "
-                 << "string, char, timestamp and decimal are not yet supported.";
-      return nullptr;
-    }
-
-    // Don't codegen things that aren't builtins (for now)
-    if (!agg_fn->is_builtin()) return nullptr;
-  }
-
-  if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateTuple because we could"
-               << "not generate a matching llvm struct for the intermediate tuple.";
-    return nullptr;
-  }
-
-  // Get the types to match the UpdateTuple signature
-  Type* agg_node_type = codegen->GetType(AggregationNode::LLVM_CLASS_NAME);
-  Type* agg_tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-
-  DCHECK(agg_node_type != nullptr);
-  DCHECK(agg_tuple_type != nullptr);
-  DCHECK(tuple_row_type != nullptr);
-
-  PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
-  PointerType* agg_tuple_ptr_type = codegen->GetPtrType(agg_tuple_type);
-  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
-
-  // Signature for UpdateTuple is
-  // void UpdateTuple(AggregationNode* this, FunctionContext** fn_ctx,
-  //     ScalarExprEvaluator** expr_eval, Tuple* tuple, TupleRow* row)
-  // This signature needs to match the non-codegen'd signature exactly.
-  StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  if (tuple_struct == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
-    return nullptr;
-  }
-  PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
-  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", agg_tuple_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
-
-  LlvmBuilder builder(codegen->context());
-  Value* args[3];
-  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
-
-  // Cast the parameter types to the internal llvm runtime types.
-  // TODO: get rid of this by using right type in function signature
-  Value* this_arg = args[0];
-  Value* agg_tuple_arg = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
-  Value* row_arg = args[2];
-
-  // Load &agg_fn_evals_[0]
-  Value* agg_fn_evals_vector = codegen->CodegenCallFunction(&builder,
-      IRFunction::AGG_NODE_GET_AGG_FN_EVALUATORS, this_arg, "agg_fn_evals");
-
-  // Loop over each expr and generate the IR for that slot. If the expr is not
-  // count(*), generate a helper IR function to update the slot and call that.
-  j = grouping_exprs_.size();
-  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFn* agg_fn = agg_fns_[i];
-    if (agg_fn->is_count_star()) {
-      // TODO: we should be able to hoist this up to the loop over the batch and just
-      // increment the slot by the number of rows in the batch.
-      int field_idx = slot_desc->llvm_field_idx();
-      Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
-      Value* slot_ptr = builder.CreateStructGEP(nullptr, agg_tuple_arg, field_idx,
-          "src_slot");
-      Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
-      Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
-      builder.CreateStore(count_inc, slot_ptr);
-    } else {
-      Function* update_slot_fn = CodegenUpdateSlot(codegen, i, slot_desc);
-      if (update_slot_fn == nullptr) return nullptr;
-
-      // Load agg_fn_evals_[i]
-      DCHECK(agg_fn_evals_[i] != nullptr);
-      Value* agg_fn_eval_arg = codegen->CodegenArrayAt(
-          &builder, agg_fn_evals_vector, i, "agg_fn_eval");
-      builder.CreateCall(update_slot_fn, {agg_fn_eval_arg, agg_tuple_arg, row_arg});
-    }
-  }
-  builder.CreateRetVoid();
-
-  // CodegenProcessRowBatch() does the final optimizations.
-  return codegen->FinalizeFunction(fn);
-}
-
-Function* AggregationNode::CodegenProcessRowBatch(LlvmCodeGen* codegen,
-    Function* update_tuple_fn) {
-  SCOPED_TIMER(codegen->codegen_timer());
-  DCHECK(update_tuple_fn != nullptr);
-
-  // Get the cross compiled update row batch function
-  IRFunction::Type ir_fn = (!grouping_exprs_.empty() ?
-      IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING :
-      IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING);
-  Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
-
-  if (process_batch_fn == nullptr) {
-    LOG(ERROR) << "Could not find AggregationNode::ProcessRowBatch in module.";
-    return nullptr;
-  }
-
-  int replaced;
-  if (!grouping_exprs_.empty()) {
-    // Aggregation w/o grouping does not use a hash table.
-
-    // Codegen for hash
-    Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
-    if (hash_fn == nullptr) return nullptr;
-
-    // Codegen HashTable::Equals
-    Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
-    if (equals_fn == nullptr) return nullptr;
-
-    // Codegen for evaluating build rows
-    Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
-    if (eval_build_row_fn == nullptr) return nullptr;
-
-    // Codegen for evaluating probe rows
-    Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
-    if (eval_probe_row_fn == nullptr) return nullptr;
-
-    // Replace call sites
-    replaced =
-        codegen->ReplaceCallSites(process_batch_fn, eval_build_row_fn, "EvalBuildRow");
-    DCHECK_EQ(replaced, 1);
-
-    replaced =
-        codegen->ReplaceCallSites(process_batch_fn, eval_probe_row_fn, "EvalProbeRow");
-    DCHECK_EQ(replaced, 1);
-
-    replaced = codegen->ReplaceCallSites(process_batch_fn, hash_fn, "HashCurrentRow");
-    DCHECK_EQ(replaced, 2);
-
-    replaced = codegen->ReplaceCallSites(process_batch_fn, equals_fn, "Equals");
-    DCHECK_EQ(replaced, 1);
-  }
-
-  replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple");
-  DCHECK_EQ(replaced, 1);
-
-  return codegen->FinalizeFunction(process_batch_fn);
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
deleted file mode 100644
index 0ea2f49..0000000
--- a/be/src/exec/aggregation-node.h
+++ /dev/null
@@ -1,174 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_AGGREGATION_NODE_H
-#define IMPALA_EXEC_AGGREGATION_NODE_H
-
-#include <boost/scoped_ptr.hpp>
-
-#include "exec/exec-node.h"
-#include "exec/old-hash-table.h"
-#include "exprs/agg-fn.h"
-#include "runtime/descriptors.h"  // for TupleId
-#include "runtime/mem-pool.h"
-#include "runtime/string-value.h"
-
-namespace llvm {
-  class Function;
-}
-
-namespace impala {
-
-class AggFnEvaluator;
-class LlvmCodeGen;
-class RowBatch;
-class RuntimeState;
-struct StringValue;
-class Tuple;
-class TupleDescriptor;
-class SlotDescriptor;
-
-/// Node for in-memory hash aggregation.
-/// The node creates a hash set of aggregation intermediate tuples, which
-/// contain slots for all grouping and aggregation exprs (the grouping
-/// slots precede the aggregation expr slots in the output tuple descriptor).
-//
-/// TODO: codegen cross-compiled UDAs and get rid of handcrafted IR.
-/// TODO: investigate high compile time for wide tables
-class AggregationNode : public ExecNode {
- public:
-  AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
-
-  static const char* LLVM_CLASS_NAME;
-
- protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
-
- private:
-  boost::scoped_ptr<OldHashTable> hash_tbl_;
-  OldHashTable::Iterator output_iterator_;
-
-  /// The list of all aggregate operations for this exec node.
-  std::vector<AggFn*> agg_fns_;
-  std::vector<AggFnEvaluator*> agg_fn_evals_;
-
-  /// Backing MemPools of 'agg_fn_evals_'.
-  boost::scoped_ptr<MemPool> agg_fn_pool_;
-
-  /// Group-by exprs used to evaluate input rows.
-  std::vector<ScalarExpr*> grouping_exprs_;
-
-  /// Exprs used to insert constructed aggregation tuple into the hash table.
-  /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<ScalarExpr*> build_exprs_;
-
-  /// Tuple into which Update()/Merge()/Serialize() results are stored.
-  TupleId intermediate_tuple_id_;
-  TupleDescriptor* intermediate_tuple_desc_;
-
-  /// Construct a new row desc for preparing the build exprs because neither the child's
-  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
-  /// in a single-node plan with an intermediate tuple different from the output tuple.
-  /// Lives in the query state's obj_pool.
-  RowDescriptor* intermediate_row_desc_;
-
-  /// Tuple into which Finalize() results are stored. Possibly the same as
-  /// the intermediate tuple.
-  TupleId output_tuple_id_;
-  TupleDescriptor* output_tuple_desc_;
-
-  /// Intermediate result of aggregation w/o GROUP BY.
-  /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
-  Tuple* singleton_intermediate_tuple_;
-
-  boost::scoped_ptr<MemPool> tuple_pool_;
-
-  /// IR for process row batch.  NULL if codegen is disabled.
-  llvm::Function* codegen_process_row_batch_fn_;
-
-  typedef void (*ProcessRowBatchFn)(AggregationNode*, RowBatch*);
-  /// Jitted ProcessRowBatch function pointer.  Null if codegen is disabled.
-  ProcessRowBatchFn process_row_batch_fn_;
-
-  /// Certain aggregates require a finalize step, which is the final step of the
-  /// aggregate after consuming all input rows. The finalize step converts the aggregate
-  /// value into its final form. This is true if this node contains aggregate that requires
-  /// a finalize step.
-  bool needs_finalize_;
-
-  /// Time spent processing the child rows
-  RuntimeProfile::Counter* build_timer_;
-  /// Time spent returning the aggregated rows
-  RuntimeProfile::Counter* get_results_timer_;
-  /// Num buckets in hash table
-  RuntimeProfile::Counter* hash_table_buckets_counter_;
-  /// Load factor in hash table
-  RuntimeProfile::Counter* hash_table_load_factor_counter_;
-
-  /// Constructs a new aggregation intermediate tuple (allocated from tuple_pool_),
-  /// initialized to grouping values computed over 'current_row_'.
-  /// Aggregation expr slots are set to their initial values.
-  Tuple* ConstructIntermediateTuple();
-
-  /// Updates the aggregation intermediate tuple 'tuple' with aggregation values
-  /// computed over 'row'. This function is replaced by codegen.
-  void UpdateTuple(Tuple* tuple, TupleRow* row);
-
-  /// Called on the intermediate tuple of each group after all input rows have been
-  /// consumed and aggregated. Computes the final aggregate values to be returned in
-  /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
-  /// For the Finalize() case if the output tuple is different from the intermediate
-  /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
-  /// Returns the tuple holding the final aggregate values.
-  Tuple* FinalizeTuple(Tuple* tuple, MemPool* pool);
-
-  /// Cross-compiled accessor for 'agg_fn_evals_'. Used by the codegen'ed code.
-  AggFnEvaluator* const* IR_ALWAYS_INLINE agg_fn_evals() const;
-
-  /// Do the aggregation for all tuple rows in the batch
-  void ProcessRowBatchNoGrouping(RowBatch* batch);
-  void ProcessRowBatchWithGrouping(RowBatch* batch);
-
-  /// Codegen the process row batch loop.  The loop has already been compiled to
-  /// IR and loaded into the codegen object.  UpdateAggTuple has also been
-  /// codegen'd to IR.  This function will modify the loop subsituting the
-  /// UpdateAggTuple function call with the (inlined) codegen'd 'update_tuple_fn'.
-  llvm::Function* CodegenProcessRowBatch(LlvmCodeGen* codegen,
-      llvm::Function* update_tuple_fn);
-
-  /// Codegen for updating aggregate_exprs at agg_fn_idx. Returns NULL if unsuccessful.
-  /// agg_fn_idx is the idx into agg_fns_ (does not include grouping exprs).
-  llvm::Function* CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
-      SlotDescriptor* slot_desc);
-
-  /// Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
-  llvm::Function* CodegenUpdateTuple(LlvmCodeGen* codegen);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index eeaccd6..7ccddc0 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -153,13 +153,7 @@ void BlockingJoinNode::ProcessBuildInputAsync(
     *status = child(1)->Open(state);
   }
   if (status->ok()) *status = AcquireResourcesForBuild(state);
-  if (status->ok()) {
-    if (build_sink == nullptr){
-      *status = ProcessBuildInput(state);
-    } else {
-      *status = SendBuildInputToSink<true>(state, build_sink);
-    }
-  }
+  if (status->ok()) *status = SendBuildInputToSink<true>(state, build_sink);
   // IMPALA-1863: If the build-side thread failed, then we need to close the right
   // (build-side) child to avoid a potential deadlock between fragment instances.  This
   // is safe to do because while the build may have partially completed, it will not be
@@ -226,11 +220,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     RETURN_IF_ERROR(child(0)->Open(state));
     RETURN_IF_ERROR(child(1)->Open(state));
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
-    if (build_sink == NULL) {
-      RETURN_IF_ERROR(ProcessBuildInput(state));
-    } else {
-      RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
-    }
+    RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
   } else {
     // The left/right child never overlap. The overlap stops here.
     built_probe_overlap_stop_watch_.SetTimeCeiling();
@@ -238,11 +228,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     // can release any resources only used during its Open().
     RETURN_IF_ERROR(child(1)->Open(state));
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
-    if (build_sink == NULL) {
-      RETURN_IF_ERROR(ProcessBuildInput(state));
-    } else {
-      RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
-    }
+    RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
     if (CanCloseBuildEarly()) child(1)->Close(state);
     RETURN_IF_ERROR(child(0)->Open(state));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 1972b34..b7dd79a 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -114,20 +114,9 @@ class BlockingJoinNode : public ExecNode {
   /// SendBuildInputToSink is called to allocate resources for this ExecNode.
   virtual Status AcquireResourcesForBuild(RuntimeState* state) { return Status::OK(); }
 
-  /// Processes the build-side input.
-  /// Called from ProcessBuildInputAndOpenProbe() if the subclass does not provide a
-  /// DataSink to consume the build input. The build-side input is already open when
-  /// this is called.
-  /// Note that this can be called concurrently with Open'ing the left child to
-  /// increase parallelism. If, for example, the left child is another join node,
-  /// it can start its own build at the same time.
-  /// TODO: move all subclasses to use the DataSink interface and remove this method.
-  virtual Status ProcessBuildInput(RuntimeState* state) = 0;
-
-  /// Processes the build-side input, which should be already open, and opens the probe
-  /// side. Will do both concurrently if not in a subplan and an extra thread token is
-  /// available. If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'.
-  /// Otherwise calls ProcessBuildInput on the subclass.
+  /// Processes the build-side input, which should be already open, by sending it to
+  /// 'build_sink', wand opens the probe side. Will do both concurrently if not in a
+  /// subplan and an extra thread token is available.
   Status ProcessBuildInputAndOpenProbe(RuntimeState* state, DataSink* build_sink);
 
   /// Set up 'current_probe_row_' to point to the first input row from the left child

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 7954660..c3d9946 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -29,12 +29,10 @@
 #include "common/status.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "exec/aggregation-node.h"
 #include "exec/analytic-eval-node.h"
 #include "exec/data-source-scan-node.h"
 #include "exec/empty-set-node.h"
 #include "exec/exchange-node.h"
-#include "exec/hash-join-node.h"
 #include "exec/hbase-scan-node.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
@@ -64,9 +62,8 @@
 
 using namespace llvm;
 
-// TODO: remove when we remove hash-join-node.cc and aggregation-node.cc
-DEFINE_bool(enable_partitioned_hash_join, true, "Enable partitioned hash join");
-DEFINE_bool(enable_partitioned_aggregation, true, "Enable partitioned hash agg");
+DEFINE_bool(enable_partitioned_hash_join, true, "Deprecated - has no effect");
+DEFINE_bool(enable_partitioned_aggregation, true, "Deprecated - has no effect");
 
 namespace impala {
 
@@ -299,24 +296,10 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       }
       break;
     case TPlanNodeType::AGGREGATION_NODE:
-      if (FLAGS_enable_partitioned_aggregation) {
-        *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
-      } else {
-        *node = pool->Add(new AggregationNode(pool, tnode, descs));
-      }
+      *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
       break;
     case TPlanNodeType::HASH_JOIN_NODE:
-      // The (old) HashJoinNode does not support left-anti, right-semi, and right-anti
-      // joins.
-      if (tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::RIGHT_SEMI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::RIGHT_ANTI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-          FLAGS_enable_partitioned_hash_join) {
-        *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
-      } else {
-        *node = pool->Add(new HashJoinNode(pool, tnode, descs));
-      }
+      *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
       break;
     case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
       *node = pool->Add(new NestedLoopJoinNode(pool, tnode, descs));
@@ -350,13 +333,6 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       *node = pool->Add(new SingularRowSrcNode(pool, tnode, descs));
       break;
     case TPlanNodeType::SUBPLAN_NODE:
-      if (!FLAGS_enable_partitioned_hash_join || !FLAGS_enable_partitioned_aggregation) {
-        error_msg << "Query referencing nested types is not supported because the "
-            << "--enable_partitioned_hash_join and/or --enable_partitioned_aggregation "
-            << "Impala Daemon start-up flags are set to false.\nTo enable nested types "
-            << "support please set those flags to true (they are enabled by default).";
-        return Status(error_msg.str());
-      }
       *node = pool->Add(new SubplanNode(pool, tnode, descs));
       break;
     case TPlanNodeType::UNNEST_NODE:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node-ir.cc b/be/src/exec/hash-join-node-ir.cc
deleted file mode 100644
index 25aa556..0000000
--- a/be/src/exec/hash-join-node-ir.cc
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "codegen/impala-ir.h"
-#include "exec/hash-join-node.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/row-batch.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-// Functions in this file are cross compiled to IR with clang.
-
-// Wrapper around ExecNode's eval conjuncts with a different function name.
-// This lets us distinguish between the join conjuncts vs. non-join conjuncts
-// for codegen.
-// Note: don't declare this static.  LLVM will pick the fastcc calling convention and
-// we will not be able to replace the functions with codegen'd versions.
-// TODO: explicitly set the calling convention?
-// TODO: investigate using fastcc for all codegen internal functions?
-bool IR_NO_INLINE EvalOtherJoinConjuncts2(
-    ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) {
-  return ExecNode::EvalConjuncts(evals, num_evals, row);
-}
-
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
-int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
-    int max_added_rows) {
-  // This path does not handle full outer or right outer joins
-  DCHECK(!match_all_build_);
-
-  int row_idx = out_batch->AddRows(max_added_rows);
-  uint8_t* out_row_mem = reinterpret_cast<uint8_t*>(out_batch->GetRow(row_idx));
-  TupleRow* out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-
-  int rows_returned = 0;
-  int probe_rows = probe_batch->num_rows();
-
-  ScalarExprEvaluator* const* other_conjunct_evals =
-      other_join_conjunct_evals_.data();
-  const int num_other_conjuncts = other_join_conjuncts_.size();
-  DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size());
-
-  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
-  const int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  while (true) {
-    // Create output row for each matching build row
-    while (!hash_tbl_iterator_.AtEnd()) {
-      TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
-      hash_tbl_iterator_.Next<true>();
-
-      if (join_op_ == TJoinOp::LEFT_SEMI_JOIN) {
-        // Evaluate the non-equi-join conjuncts against a temp row assembled from all
-        // build and probe tuples.
-        if (num_other_conjuncts > 0) {
-          CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-          if (!EvalOtherJoinConjuncts2(other_conjunct_evals, num_other_conjuncts,
-                  semi_join_staging_row_)) {
-            continue;
-          }
-        }
-        out_batch->CopyRow(current_probe_row_, out_row);
-      } else {
-        CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-        if (!EvalOtherJoinConjuncts2(
-                other_conjunct_evals, num_other_conjuncts, out_row)) {
-          continue;
-        }
-      }
-      matched_probe_ = true;
-
-      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
-        ++rows_returned;
-        // Filled up out batch or hit limit
-        if (UNLIKELY(rows_returned == max_added_rows)) goto end;
-        // Advance to next out row
-        out_row_mem += out_batch->row_byte_size();
-        out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-      }
-
-      // Handle left semi-join
-      if (match_one_build_) {
-        hash_tbl_iterator_ = hash_tbl_->End();
-        break;
-      }
-    }
-
-    // Handle left outer-join
-    if (!matched_probe_ && match_all_probe_) {
-      CreateOutputRow(out_row, current_probe_row_, NULL);
-      matched_probe_ = true;
-      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
-        ++rows_returned;
-        if (UNLIKELY(rows_returned == max_added_rows)) goto end;
-        // Advance to next out row
-        out_row_mem += out_batch->row_byte_size();
-        out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-      }
-    }
-
-    if (hash_tbl_iterator_.AtEnd()) {
-      // Advance to the next probe row
-      if (UNLIKELY(probe_batch_pos_ == probe_rows)) goto end;
-      current_probe_row_ = probe_batch->GetRow(probe_batch_pos_++);
-      hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
-      matched_probe_ = false;
-    }
-  }
-
-end:
-  if (match_one_build_ && matched_probe_) hash_tbl_iterator_ = hash_tbl_->End();
-  out_batch->CommitRows(rows_returned);
-  return rows_returned;
-}
-
-void HashJoinNode::ProcessBuildBatch(RowBatch* build_batch) {
-  // insert build row into our hash table
-  for (int i = 0; i < build_batch->num_rows(); ++i) {
-    hash_tbl_->Insert(build_batch->GetRow(i));
-  }
-}