You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/02 06:28:24 UTC

[1/6] incubator-impala git commit: IMPALA-5745: Bump Breakpad version

Repository: incubator-impala
Updated Branches:
  refs/heads/master ae7200874 -> 3f82d1570


IMPALA-5745: Bump Breakpad version

This change bumps the breakpad version to pull in a change that
increases the maximum number of memory regions in minidump_stackwalk.

Change-Id: I07b502fd5f6c5b3ce9eabf3b3ce6992d8f89f5af
Reviewed-on: http://gerrit.cloudera.org:8080/7547
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1f89d2dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1f89d2dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1f89d2dc

Branch: refs/heads/master
Commit: 1f89d2dc4452bed4e7590ec6e8b5ce65cfd95390
Parents: ae72008
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Jul 31 15:17:56 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Aug 1 20:23:37 2017 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1f89d2dc/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index fdf74ad..2699339 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,13 +72,13 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=426-d9b44a854b
+export IMPALA_TOOLCHAIN_BUILD_ID=438-a80bf530fe
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
 export IMPALA_BINUTILS_VERSION=2.26.1
 export IMPALA_BOOST_VERSION=1.57.0-p3
-export IMPALA_BREAKPAD_VERSION=ffe3e478657dc7126fca6329dfcedc49f4c726d9-p2
+export IMPALA_BREAKPAD_VERSION=1b704857f1e78a864e6942e613457e55f1aecb60-p2
 export IMPALA_BZIP2_VERSION=1.0.6-p2
 export IMPALA_CMAKE_VERSION=3.2.3-p1
 export IMPALA_CRCUTIL_VERSION=440ba7babeff77ffad992df3a10c767f184e946e-p1


[2/6] incubator-impala git commit: IMPALA-5616: Add --enable_minidumps startup flag

Posted by ta...@apache.org.
IMPALA-5616: Add --enable_minidumps startup flag

If set to 'false', this flag will disable registration of the Breakpad
signal handlers during startup. The default value is 'true'. This does
not affect the ability to disable the handlers by specifying an empty
value for --minidump_path.

This change adds a test to test_breakpad.py.

Change-Id: Ie2039b9140e1c281810b27b76140e2105198bc37
Reviewed-on: http://gerrit.cloudera.org:8080/7541
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/344c26aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/344c26aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/344c26aa

Branch: refs/heads/master
Commit: 344c26aa29cd5f0ec7711eb873cbfa2a4151942a
Parents: 1f89d2d
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Jul 31 13:08:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 2 01:32:06 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc         |  3 +++
 be/src/util/minidump.cc               |  2 ++
 tests/custom_cluster/test_breakpad.py | 13 +++++++++----
 3 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/344c26aa/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 1a36fad..0a6ca28 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -70,6 +70,9 @@ DEFINE_string(redaction_rules_file, "", "Absolute path to sensitive data redacti
     "Web UI and audit records. Query results will not be affected. Refer to the "
     "documentation for the rule file format.");
 
+DEFINE_bool(enable_minidumps, true, "Whether to enable minidump generation upon process "
+    "crash or SIGUSR1.");
+
 DEFINE_string(minidump_path, "minidumps", "Directory to write minidump files to. This "
     "can be either an absolute path or a path relative to log_dir. Each daemon will "
     "create an additional sub-directory to prevent naming conflicts and to make it "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/344c26aa/be/src/util/minidump.cc
----------------------------------------------------------------------
diff --git a/be/src/util/minidump.cc b/be/src/util/minidump.cc
index b87ea90..78c48df 100644
--- a/be/src/util/minidump.cc
+++ b/be/src/util/minidump.cc
@@ -44,6 +44,7 @@ using boost::filesystem::path;
 using boost::filesystem::remove;
 
 DECLARE_string(log_dir);
+DECLARE_bool(enable_minidumps);
 DECLARE_string(minidump_path);
 DECLARE_int32(max_minidumps);
 DECLARE_int32(minidump_size_limit_hint_kb);
@@ -193,6 +194,7 @@ Status RegisterMinidump(const char* cmd_line_path) {
   DCHECK(!registered);
   registered = true;
 
+  if (!FLAGS_enable_minidumps) return Status::OK();
   if (FLAGS_minidump_path.empty()) return Status::OK();
 
   if (path(FLAGS_minidump_path).is_relative()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/344c26aa/tests/custom_cluster/test_breakpad.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 120e9b9..8300634 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -83,9 +83,6 @@ class TestBreakpad(CustomClusterTestSuite):
     self.start_cluster_with_args(minidump_path=self.tmp_dir,
                                  max_minidumps=self.MAX_MINIDUMPS)
 
-  def start_cluster_without_minidumps(self):
-    self.start_cluster_with_args(minidump_path='', max_minidumps=self.MAX_MINIDUMPS)
-
   def kill_cluster(self, signal):
     self.cluster.refresh()
     processes = self.cluster.impalads + [self.cluster.catalogd, self.cluster.statestored]
@@ -216,10 +213,18 @@ class TestBreakpad(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   def test_disable_minidumps(self):
+    """Check that setting enable_minidumps to false disables minidump creation."""
+    assert self.count_all_minidumps() == 0
+    self.start_cluster_with_args(enable_minidumps=False)
+    self.kill_cluster(SIGSEGV)
+    self.assert_num_logfile_entries(0)
+
+  @pytest.mark.execute_serially
+  def test_empty_minidump_path_disables_breakpad(self):
     """Check that setting the minidump_path to an empty value disables minidump creation.
     """
     assert self.count_all_minidumps() == 0
-    self.start_cluster_without_minidumps()
+    self.start_cluster_with_args(minidump_path='', max_minidumps=self.MAX_MINIDUMPS)
     self.kill_cluster(SIGSEGV)
     self.assert_num_logfile_entries(0)
 


[5/6] incubator-impala git commit: IMPALA-4674: Part 1: remove old aggs and joins

Posted by ta...@apache.org.
IMPALA-4674: Part 1: remove old aggs and joins

This is intended to be merged at the same time as Part 2 but is
separated out to make the change more reviewable. Part 2 assumes
that it does not need special logic to handle this mode (e.g.
because the old aggs and joins don't use reservation).

Disable the --enable_partitioned_{aggregation,hash_join} options
and remove all product and test code associated with them.

Change-Id: I5ce2236d37c0ced188a4a81f7e00d4b8ac98e7e9
Reviewed-on: http://gerrit.cloudera.org:8080/7102
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/507bd8be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/507bd8be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/507bd8be

Branch: refs/heads/master
Commit: 507bd8be7e5e710b37f845ef6cf23238b426ee0a
Parents: 344c26a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jun 6 10:45:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 2 01:49:12 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py           |  18 -
 be/src/codegen/impala-ir.cc                     |   3 -
 be/src/exec/CMakeLists.txt                      |   7 -
 be/src/exec/aggregation-node-ir.cc              |  55 --
 be/src/exec/aggregation-node.cc                 | 878 -------------------
 be/src/exec/aggregation-node.h                  | 174 ----
 be/src/exec/blocking-join-node.cc               |  20 +-
 be/src/exec/blocking-join-node.h                |  17 +-
 be/src/exec/exec-node.cc                        |  32 +-
 be/src/exec/hash-join-node-ir.cc                | 140 ---
 be/src/exec/hash-join-node.cc                   | 673 --------------
 be/src/exec/hash-join-node.h                    | 164 ----
 be/src/exec/hdfs-parquet-scanner.cc             |   2 -
 be/src/exec/nested-loop-join-node.cc            |   5 -
 be/src/exec/nested-loop-join-node.h             |   3 -
 be/src/exec/old-hash-table-ir.cc                |  42 -
 be/src/exec/old-hash-table-test.cc              | 337 -------
 be/src/exec/old-hash-table.cc                   | 872 ------------------
 be/src/exec/old-hash-table.h                    | 548 ------------
 be/src/exec/old-hash-table.inline.h             | 189 ----
 be/src/exec/partitioned-hash-join-node.cc       |   5 -
 be/src/exec/partitioned-hash-join-node.h        |   1 -
 be/src/exprs/agg-fn-evaluator.cc                |   1 -
 be/src/runtime/row-batch.cc                     |  51 +-
 be/src/runtime/row-batch.h                      |  17 +-
 be/src/util/backend-gflag-util.cc               |   2 -
 common/thrift/BackendGflags.thrift              |   6 +-
 .../org/apache/impala/planner/JoinNode.java     |   3 +-
 .../apache/impala/service/BackendConfig.java    |   4 -
 .../queries/QueryTest/legacy-joins-aggs.test    |  45 -
 tests/common/environ.py                         |   6 -
 tests/common/skip.py                            |  12 +-
 tests/custom_cluster/test_legacy_joins_aggs.py  |  33 -
 tests/metadata/test_ddl.py                      |   3 +-
 tests/query_test/test_aggregation.py            |  14 +-
 tests/query_test/test_join_queries.py           |   2 -
 tests/query_test/test_mt_dop.py                 |   2 -
 tests/query_test/test_nested_types.py           |   4 -
 tests/query_test/test_runtime_filters.py        |   4 +-
 tests/query_test/test_scanners.py               |   3 -
 tests/query_test/test_tpch_nested_queries.py    |   2 -
 41 files changed, 36 insertions(+), 4363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index ac36bb5..94dc86a 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -43,12 +43,6 @@ options, args = parser.parse_args()
 # the bit code module.
 # TODO: should we work out the mangling rules?
 ir_functions = [
-  ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING",
-   "_ZN6impala15AggregationNode27ProcessRowBatchWithGroupingEPNS_8RowBatchE"],
-  ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING",
-   "_ZN6impala15AggregationNode25ProcessRowBatchNoGroupingEPNS_8RowBatchE"],
-  ["AGG_NODE_GET_AGG_FN_EVALUATORS",
-   "_ZNK6impala15AggregationNode12agg_fn_evalsEv"],
   ["AGG_FN_EVALUATOR_INPUT_EVALUATORS",
    "_ZNK6impala14AggFnEvaluator11input_evalsEv"],
   ["AGG_FN_EVALUATOR_AGG_FN_CTX",
@@ -104,18 +98,6 @@ ir_functions = [
   ["HASH_CRC", "IrCrcHash"],
   ["HASH_FNV", "IrFnvHash"],
   ["HASH_MURMUR", "IrMurmurHash"],
-  ["HASH_JOIN_PROCESS_BUILD_BATCH",
-   "_ZN6impala12HashJoinNode17ProcessBuildBatchEPNS_8RowBatchE"],
-  ["HASH_JOIN_PROCESS_PROBE_BATCH",
-   "_ZN6impala12HashJoinNode17ProcessProbeBatchEPNS_8RowBatchES2_i"],
-  ["OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS",
-   "_ZNK6impala12OldHashTable16build_expr_evalsEv"],
-  ["OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS",
-   "_ZNK6impala12OldHashTable16probe_expr_evalsEv"],
-  ["OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER",
-   "_ZNK6impala12OldHashTable18expr_values_bufferEv"],
-  ["OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS",
-   "_ZNK6impala12OldHashTable20expr_value_null_bitsEv"],
   ["PHJ_PROCESS_BUILD_BATCH",
    "_ZN6impala10PhjBuilder17ProcessBuildBatchEPNS_8RowBatchEPNS_12HashTableCtxEb"],
   ["PHJ_PROCESS_PROBE_BATCH_INNER_JOIN",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2500a33..2992849 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -26,13 +26,10 @@
 #pragma clang diagnostic ignored "-Wheader-hygiene"
 
 #include "codegen/codegen-anyval-ir.cc"
-#include "exec/aggregation-node-ir.cc"
-#include "exec/hash-join-node-ir.cc"
 #include "exec/hash-table-ir.cc"
 #include "exec/hdfs-avro-scanner-ir.cc"
 #include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
-#include "exec/old-hash-table-ir.cc"
 #include "exec/partitioned-aggregation-node-ir.cc"
 #include "exec/partitioned-hash-join-builder-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7d86f1c..a94a38d 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,8 +25,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
 
 add_library(Exec
-  aggregation-node.cc
-  aggregation-node-ir.cc
   analytic-eval-node.cc
   base-sequence-scanner.cc
   blocking-join-node.cc
@@ -39,10 +37,6 @@ add_library(Exec
   exchange-node.cc
   external-data-source-executor.cc
   filter-context.cc
-  hash-join-node.cc
-  hash-join-node-ir.cc
-  old-hash-table.cc
-  old-hash-table-ir.cc
   hash-table.cc
   hbase-table-sink.cc
   hbase-table-writer.cc
@@ -105,7 +99,6 @@ add_library(Exec
 add_dependencies(Exec thrift-deps)
 
 ADD_BE_TEST(zigzag-test)
-ADD_BE_TEST(old-hash-table-test)
 ADD_BE_TEST(hash-table-test)
 ADD_BE_TEST(delimited-text-parser-test)
 ADD_BE_TEST(read-write-util-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-ir.cc b/be/src/exec/aggregation-node-ir.cc
deleted file mode 100644
index af13b68..0000000
--- a/be/src/exec/aggregation-node-ir.cc
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/aggregation-node.h"
-
-#include "exec/old-hash-table.inline.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple-row.h"
-
-using namespace impala;
-
-// Functions in this file are cross compiled to IR with clang.  These functions
-// are modified at runtime with a query specific codegen'd UpdateAggTuple
-
-AggFnEvaluator* const* AggregationNode::agg_fn_evals() const {
-  return agg_fn_evals_.data();
-}
-
-void AggregationNode::ProcessRowBatchNoGrouping(RowBatch* batch) {
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    UpdateTuple(singleton_intermediate_tuple_, batch->GetRow(i));
-  }
-}
-
-void AggregationNode::ProcessRowBatchWithGrouping(RowBatch* batch) {
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    TupleRow* row = batch->GetRow(i);
-    Tuple* agg_tuple = NULL;
-    OldHashTable::Iterator it = hash_tbl_->Find(row);
-    if (it.AtEnd()) {
-      agg_tuple = ConstructIntermediateTuple();
-      hash_tbl_->Insert(agg_tuple);
-    } else {
-      agg_tuple = it.GetTuple();
-    }
-    UpdateTuple(agg_tuple, row);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
deleted file mode 100644
index 13f7dd3..0000000
--- a/be/src/exec/aggregation-node.cc
+++ /dev/null
@@ -1,878 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/aggregation-node.h"
-
-#include <math.h>
-#include <sstream>
-#include <boost/functional/hash.hpp>
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include <x86intrin.h>
-
-#include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/agg-fn-evaluator.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem-pool.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple-row.h"
-#include "udf/udf-internal.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/Exprs_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace llvm;
-
-namespace impala {
-
-const char* AggregationNode::LLVM_CLASS_NAME = "class.impala::AggregationNode";
-
-// TODO: pass in maximum size; enforce by setting limit in mempool
-AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
-    const DescriptorTbl& descs)
-  : ExecNode(pool, tnode, descs),
-    intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
-    intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
-    intermediate_row_desc_(pool->Add(new RowDescriptor(intermediate_tuple_desc_, false))),
-    output_tuple_id_(tnode.agg_node.output_tuple_id),
-    output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
-    singleton_intermediate_tuple_(nullptr),
-    codegen_process_row_batch_fn_(nullptr),
-    process_row_batch_fn_(nullptr),
-    needs_finalize_(tnode.agg_node.need_finalize),
-    build_timer_(nullptr),
-    get_results_timer_(nullptr),
-    hash_table_buckets_counter_(nullptr) {
-  DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
-}
-
-Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  DCHECK(intermediate_tuple_desc_ != nullptr);
-  DCHECK(output_tuple_desc_ != nullptr);
-  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
-
-  const RowDescriptor& row_desc = *child(0)->row_desc();
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc, state,
-      &grouping_exprs_));
-  for (int i = 0; i < grouping_exprs_.size(); ++i) {
-    SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
-    DCHECK(desc->type().type == TYPE_NULL ||
-        desc->type() == grouping_exprs_[i]->type());
-    // TODO: Generate the build exprs in the FE such that the existing logic
-    // for handling NULL_TYPE works.
-    SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ?
-        new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
-    build_exprs_.push_back(build_expr);
-    RETURN_IF_ERROR(build_expr->Init(*intermediate_row_desc_, state));
-  }
-
-  int j = grouping_exprs_.size();
-  for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i, ++j) {
-    SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
-    SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
-    AggFn* agg_fn;
-    RETURN_IF_ERROR(AggFn::Create(tnode.agg_node.aggregate_functions[i], row_desc,
-        *intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
-    agg_fns_.push_back(agg_fn);
-  }
-  return Status::OK();
-}
-
-Status AggregationNode::Prepare(RuntimeState* state) {
-  DCHECK(output_iterator_.AtEnd());
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Prepare(state));
-
-  tuple_pool_.reset(new MemPool(mem_tracker()));
-  agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
-  build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
-  get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
-  hash_table_buckets_counter_ =
-      ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
-  hash_table_load_factor_counter_ =
-      ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
-  RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, agg_fn_pool_.get(),
-      &agg_fn_evals_));
-  DCHECK_EQ(agg_fns_.size(), agg_fn_evals_.size());
-
-  // TODO: how many buckets?
-  vector<ScalarExpr*>* filter_exprs = pool_->Add(new vector<ScalarExpr*>());
-  RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, grouping_exprs_,
-      *filter_exprs, 1, true, vector<bool>(build_exprs_.size(), true),
-      id(), mem_tracker(), vector<RuntimeFilter*>(), &hash_tbl_, true));
-  AddCodegenDisabledMessage(state);
-  return Status::OK();
-}
-
-void AggregationNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
-  if (IsNodeCodegenDisabled()) return;
-
-  bool codegen_enabled = false;
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != nullptr);
-  Function* update_tuple_fn = CodegenUpdateTuple(codegen);
-  if (update_tuple_fn != nullptr) {
-    codegen_process_row_batch_fn_ = CodegenProcessRowBatch(codegen, update_tuple_fn);
-    if (codegen_process_row_batch_fn_ != nullptr) {
-      // Update to using codegen'd process row batch.
-      codegen->AddFunctionToJit(codegen_process_row_batch_fn_,
-          reinterpret_cast<void**>(&process_row_batch_fn_));
-      codegen_enabled = true;
-    }
-  }
-  runtime_profile()->AddCodegenMsg(codegen_enabled);
-}
-
-Status AggregationNode::Open(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecNode::Open(state));
-  RETURN_IF_ERROR(hash_tbl_->Open(state));
-  RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
-
-  if (grouping_exprs_.empty()) {
-    // Create single intermediate tuple. This must happen after
-    // opening the aggregate evaluators.
-    singleton_intermediate_tuple_ = ConstructIntermediateTuple();
-    // Check for failures during AggFnEvaluator::Init().
-    RETURN_IF_ERROR(state->GetQueryStatus());
-    hash_tbl_->Insert(singleton_intermediate_tuple_);
-  }
-
-  RETURN_IF_ERROR(children_[0]->Open(state));
-
-  RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
-  int64_t num_input_rows = 0;
-  while (true) {
-    bool eos;
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
-    SCOPED_TIMER(build_timer_);
-
-    if (VLOG_ROW_IS_ON) {
-      for (int i = 0; i < batch.num_rows(); ++i) {
-        TupleRow* row = batch.GetRow(i);
-        VLOG_ROW << "input row: " << PrintRow(row, *children_[0]->row_desc());
-      }
-    }
-    if (process_row_batch_fn_ != nullptr) {
-      process_row_batch_fn_(this, &batch);
-    } else if (grouping_exprs_.empty()) {
-      ProcessRowBatchNoGrouping(&batch);
-    } else {
-      ProcessRowBatchWithGrouping(&batch);
-    }
-    COUNTER_SET(hash_table_buckets_counter_, hash_tbl_->num_buckets());
-    COUNTER_SET(hash_table_load_factor_counter_, hash_tbl_->load_factor());
-    num_input_rows += batch.num_rows();
-    // We must set output_iterator_ here, rather than outside the loop, because
-    // output_iterator_ must be set if the function returns within the loop
-    output_iterator_ = hash_tbl_->Begin();
-
-    batch.Reset();
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    if (eos) break;
-  }
-
-  // We have consumed all of the input from the child and transfered ownership of the
-  // resources we need, so the child can be closed safely to release its resources.
-  child(0)->Close(state);
-  VLOG_FILE << "aggregated " << num_input_rows << " input rows into "
-            << hash_tbl_->size() << " output rows";
-  return Status::OK();
-}
-
-Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
-  SCOPED_TIMER(get_results_timer_);
-
-  if (ReachedLimit()) {
-    *eos = true;
-    return Status::OK();
-  }
-  *eos = false;
-  ScalarExprEvaluator* const* evals = conjunct_evals_.data();
-  int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  int count = 0;
-  const int N = state->batch_size();
-  while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
-    // This loop can go on for a long time if the conjuncts are very selective. Do query
-    // maintenance every N iterations.
-    if (count++ % N == 0) {
-      RETURN_IF_CANCELLED(state);
-      RETURN_IF_ERROR(QueryMaintenance(state));
-    }
-    int row_idx = row_batch->AddRow();
-    TupleRow* row = row_batch->GetRow(row_idx);
-    Tuple* intermediate_tuple = output_iterator_.GetTuple();
-    Tuple* output_tuple = FinalizeTuple(intermediate_tuple, row_batch->tuple_data_pool());
-    output_iterator_.Next<false>();
-    row->SetTuple(0, output_tuple);
-    if (ExecNode::EvalConjuncts(evals, num_conjuncts, row)) {
-      VLOG_ROW << "output row: " << PrintRow(row, *row_desc());
-      row_batch->CommitLastRow();
-      ++num_rows_returned_;
-      if (ReachedLimit()) break;
-    }
-  }
-  *eos = output_iterator_.AtEnd() || ReachedLimit();
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-  return Status::OK();
-}
-
-Status AggregationNode::Reset(RuntimeState* state) {
-  DCHECK(false) << "NYI";
-  return Status("NYI");
-}
-
-void AggregationNode::Close(RuntimeState* state) {
-  if (is_closed()) return;
-
-  // Iterate through the remaining rows in the hash table and call Serialize/Finalize on
-  // them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
-  // but we don't actually need the result, so allocate a single dummy tuple to avoid
-  // accumulating memory.
-  Tuple* dummy_dst = nullptr;
-  // 'tuple_pool_' can be NULL if Prepare() failed.
-  if (needs_finalize_ && tuple_pool_.get() != nullptr) {
-    dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), tuple_pool_.get());
-  }
-  while (!output_iterator_.AtEnd()) {
-    Tuple* tuple = output_iterator_.GetTuple();
-    if (needs_finalize_) {
-      AggFnEvaluator::Finalize(agg_fn_evals_, tuple, dummy_dst);
-    } else {
-      AggFnEvaluator::Serialize(agg_fn_evals_, tuple);
-    }
-    output_iterator_.Next<false>();
-  }
-
-  if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
-  if (hash_tbl_.get() != nullptr) hash_tbl_->Close(state);
-
-  AggFnEvaluator::Close(agg_fn_evals_, state);
-  agg_fn_evals_.clear();
-  AggFn::Close(agg_fns_);
-  if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->FreeAll();
-
-  ScalarExpr::Close(grouping_exprs_);
-  ScalarExpr::Close(build_exprs_);
-  ExecNode::Close(state);
-}
-
-Status AggregationNode::QueryMaintenance(RuntimeState* state) {
-  if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
-Tuple* AggregationNode::ConstructIntermediateTuple() {
-  Tuple* intermediate_tuple = Tuple::Create(
-      intermediate_tuple_desc_->byte_size(), tuple_pool_.get());
-  vector<SlotDescriptor*>::const_iterator slot_desc =
-      intermediate_tuple_desc_->slots().begin();
-
-  // copy grouping values
-  for (int i = 0; i < grouping_exprs_.size(); ++i, ++slot_desc) {
-    if (hash_tbl_->last_expr_value_null(i)) {
-      intermediate_tuple->SetNull((*slot_desc)->null_indicator_offset());
-    } else {
-      void* src = hash_tbl_->last_expr_value(i);
-      void* dst = intermediate_tuple->GetSlot((*slot_desc)->tuple_offset());
-      RawValue::Write(src, dst, (*slot_desc)->type(), tuple_pool_.get());
-    }
-  }
-
-  // Initialize aggregate output.
-  DCHECK_EQ(agg_fns_.size(), agg_fn_evals_.size());
-  for (int i = 0; i < agg_fns_.size(); ++i, ++slot_desc) {
-    AggFnEvaluator* eval = agg_fn_evals_[i];
-    eval->Init(intermediate_tuple);
-    // Codegen specific path.
-    // To minimize branching on the UpdateTuple path, initialize the result value
-    // so that UpdateTuple doesn't have to check if the aggregation
-    // dst slot is null.
-    //  - sum/count: 0
-    //  - min: max_value
-    //  - max: min_value
-    // TODO: remove when we don't use the irbuilder for codegen here.
-    // This optimization no longer applies with AnyVal
-    if ((*slot_desc)->type().type != TYPE_STRING &&
-        (*slot_desc)->type().type != TYPE_VARCHAR &&
-        (*slot_desc)->type().type != TYPE_TIMESTAMP &&
-        (*slot_desc)->type().type != TYPE_CHAR &&
-        (*slot_desc)->type().type != TYPE_DECIMAL) {
-      ExprValue default_value;
-      void* default_value_ptr = nullptr;
-      switch (agg_fns_[i]->agg_op()) {
-        case AggFn::MIN:
-          default_value_ptr = default_value.SetToMax((*slot_desc)->type());
-          RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
-          break;
-        case AggFn::MAX:
-          default_value_ptr = default_value.SetToMin((*slot_desc)->type());
-          RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
-          break;
-        default:
-          break;
-      }
-    }
-  }
-  return intermediate_tuple;
-}
-
-void AggregationNode::UpdateTuple(Tuple* tuple, TupleRow* row) {
-  DCHECK(tuple != nullptr || agg_fn_evals_.empty());
-  AggFnEvaluator::Add(agg_fn_evals_, row, tuple);
-}
-
-Tuple* AggregationNode::FinalizeTuple(Tuple* tuple, MemPool* pool) {
-  DCHECK(tuple != nullptr || agg_fn_evals_.empty());
-  DCHECK(output_tuple_desc_ != nullptr);
-
-  Tuple* dst = tuple;
-  if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
-    dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
-  }
-  if (needs_finalize_) {
-    AggFnEvaluator::Finalize(agg_fn_evals_, tuple, dst);
-  } else {
-    AggFnEvaluator::Serialize(agg_fn_evals_, tuple);
-  }
-  // Copy grouping values from tuple to dst.
-  // TODO: Codegen this.
-  if (dst != tuple) {
-    int num_grouping_slots = grouping_exprs_.size();
-    for (int i = 0; i < num_grouping_slots; ++i) {
-      SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
-      SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
-      bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
-      void* src_slot = nullptr;
-      if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
-      RawValue::Write(src_slot, dst, dst_slot_desc, nullptr);
-    }
-  }
-  return dst;
-}
-
-void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
-  *out << string(indentation_level * 2, ' ');
-  *out << "AggregationNode("
-       << "intermediate_tuple_id=" << intermediate_tuple_id_
-       << " output_tuple_id=" << output_tuple_id_
-       << " needs_finalize=" << needs_finalize_
-       << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
-       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
-  ExecNode::DebugString(indentation_level, out);
-  *out << ")";
-}
-
-IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
-  switch (type.type) {
-    case TYPE_BOOLEAN: return IRFunction::HLL_UPDATE_BOOLEAN;
-    case TYPE_TINYINT: return IRFunction::HLL_UPDATE_TINYINT;
-    case TYPE_SMALLINT: return IRFunction::HLL_UPDATE_SMALLINT;
-    case TYPE_INT: return IRFunction::HLL_UPDATE_INT;
-    case TYPE_BIGINT: return IRFunction::HLL_UPDATE_BIGINT;
-    case TYPE_FLOAT: return IRFunction::HLL_UPDATE_FLOAT;
-    case TYPE_DOUBLE: return IRFunction::HLL_UPDATE_DOUBLE;
-    case TYPE_STRING: return IRFunction::HLL_UPDATE_STRING;
-    case TYPE_DECIMAL: return IRFunction::HLL_UPDATE_DECIMAL;
-    default:
-      DCHECK(false) << "Unsupported type: " << type;
-      return IRFunction::FN_END;
-  }
-}
-
-// IR Generation for updating a single aggregation slot. Signature is:
-// void UpdateSlot(FunctionContext* fn_ctx, ScalarExprEvaluator* expr_eval,
-//     AggTuple* agg_tuple, char** row)
-//
-// The IR for sum(double_col) is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//     <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #32 {
-// entry:
-//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %0
-//   %src = call { i8, double } @GetSlotRef(
-//       %"class.impala::ScalarExprEvaluator"* %input_eval,
-//       %"class.impala::TupleRow"* %row)
-//   %1 = extractvalue { i8, double } %src, 0
-//   %is_null = trunc i8 %1 to i1
-//   br i1 %is_null, label %ret, label %src_not_null
-//
-// src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
-//       <{ double, i8 }>* %agg_tuple, i32 0, i32 0
-//   %2 = bitcast <{ double, i8 }>* %agg_tuple to i8*
-//   %null_byte_ptr = getelementptr inbounds i8, i8* %2, i32 8
-//   %null_byte = load i8, i8* %null_byte_ptr
-//   %null_bit_cleared = and i8 %null_byte, -2
-//   store i8 %null_bit_cleared, i8* %null_byte_ptr
-//   %dst_val = load double, double* %dst_slot_ptr
-//   %val = extractvalue { i8, double } %src, 1
-//   %3 = fadd double %dst_val, %val
-//   store double %3, double* %dst_slot_ptr
-//   br label %ret
-//
-// ret:                                              ; preds = %src_not_null, %entry
-//   ret void
-// }
-//
-// The IR for ndv(double_col) is:
-//
-// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//     <{ %"struct.impala::StringValue" }>* %agg_tuple,
-//     %"class.impala::TupleRow"* %row) #32 {
-// entry:
-//   %dst_lowered_ptr = alloca { i64, i8* }
-//   %0 = alloca { i8, double }
-//   %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
-//       @_ZNK6impala14AggFnEvaluator11input_evalsEv(
-//           %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
-//   %input_eval = load %"class.impala::ScalarExprEvaluator"*,
-//       %"class.impala::ScalarExprEvaluator"** %1
-//   %src = call { i8, double } @GetSlotRef(
-//       %"class.impala::ScalarExprEvaluator"* %input_eval,
-//           %"class.impala::TupleRow"* %row)
-//   %2 = extractvalue { i8, double } %src, 0
-//   %is_null = trunc i8 %2 to i1
-//   br i1 %is_null, label %ret, label %src_not_null
-//
-// src_not_null:                                     ; preds = %entry
-//   %dst_slot_ptr = getelementptr inbounds <{ %"struct.impala::StringValue" }>,
-//       <{ %"struct.impala::StringValue" }>* %agg_tuple, i32 0, i32 0
-//   %dst_val =
-//       load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr
-//   store { i8, double } %src, { i8, double }* %0
-//   %src_unlowered_ptr = bitcast { i8, double }* %0 to %"struct.impala_udf::DoubleVal"*
-//   %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
-//   %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
-//   %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
-//   %3 = extractvalue { i64, i8* } %dst_stringval, 0
-//   %4 = zext i32 %len to i64
-//   %5 = shl i64 %4, 32
-//   %6 = and i64 %3, 4294967295
-//   %7 = or i64 %6, %5
-//   %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %7, 0
-//   store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
-//   %dst_unlowered_ptr =
-//       bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
-//   %agg_fn_ctx_arg = call %"class.impala_udf::FunctionContext"*
-//       @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
-//            %"class.impala::AggFnEvaluator"* %agg_fn_eval)
-//   call void
-//       @_ZN6impala18AggregateFunctions9HllUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE(
-//           %"class.impala_udf::FunctionContext"* %agg_fn_ctx_arg,
-//           %"struct.impala_udf::DoubleVal"* %src_unlowered_ptr,
-//           %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
-//   %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
-//   %8 = extractvalue { i64, i8* } %anyval_result, 0
-//   %9 = ashr i64 %8, 32
-//   %10 = trunc i64 %9 to i32
-//   %11 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %10, 1
-//   %12 = extractvalue { i64, i8* } %anyval_result, 1
-//   %13 = insertvalue %"struct.impala::StringValue" %11, i8* %12, 0
-//   store %"struct.impala::StringValue" %13, %"struct.impala::StringValue"* %dst_slot_ptr
-//   br label %ret
-//
-// ret:                                              ; preds = %src_not_null, %entry
-//   ret void
-// }
-//
-llvm::Function* AggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
-    int agg_fn_idx, SlotDescriptor* slot_desc) {
-  AggFn* agg_fn = agg_fns_[agg_fn_idx];
-  ScalarExpr* input_expr = agg_fn->GetChild(0);
-  // TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
-  // with multiple input expressions (e.g. group_concat).
-  DCHECK_EQ(agg_fn->GetNumChildren(), 1);
-  // TODO: implement timestamp
-  if (input_expr->type().type == TYPE_TIMESTAMP) return nullptr;
-
-  // Codegen the input expression's GetValue() function.
-  llvm::Function* input_expr_fn;
-  Status status = input_expr->GetCodegendComputeFn(codegen, &input_expr_fn);
-  if (!status.ok()) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
-    return nullptr;
-  }
-  DCHECK(input_expr_fn != nullptr);
-
-  // Create the types of the UpdateSlot()'s arguments.
-  PointerType* agg_fn_eval_type =
-      codegen->GetPtrType(AggFnEvaluator::LLVM_CLASS_NAME);
-  StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  if (tuple_struct == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
-    return nullptr;
-  }
-  PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
-  PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
-
-  // Create UpdateSlot() prototype
-  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
-  prototype.AddArgument(
-      LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  LlvmBuilder builder(codegen->context());
-  Value* args[3];
-  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
-  Value* agg_fn_eval_arg = args[0];
-  Value* agg_tuple_arg = args[1];
-  Value* row_arg = args[2];
-
-  BasicBlock* src_not_null_block =
-      BasicBlock::Create(codegen->context(), "src_not_null", fn);
-  BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
-
-  // Get the first input expression's evaluator. This assumes there is only one
-  // input to the agg_fn. See DCHECK at the beginning of this function for it.
-  Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
-      IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
-      "input_evals_vector");
-  Value* input_eval =
-      codegen->CodegenArrayAt(&builder, input_evals_vector, 0, "input_eval");
-
-  // Call expr function to get src slot value
-  CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-      input_expr->type(), input_expr_fn, {input_eval, row_arg}, "src");
-
-  Value* src_is_null = src.GetIsNull();
-  builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
-
-  // Src slot is not null, update dst_slot
-  builder.SetInsertPoint(src_not_null_block);
-  Value* dst_ptr = builder.CreateStructGEP(nullptr, agg_tuple_arg,
-      slot_desc->llvm_field_idx(), "dst_slot_ptr");
-  Value* result = nullptr;
-
-  if (slot_desc->is_nullable()) {
-    // Dst is nullptr, just update dst slot to src slot and clear null bit
-    slot_desc->CodegenSetNullIndicator(
-        codegen, &builder, agg_tuple_arg, codegen->false_value());
-  }
-
-  // Update the slot
-  Value* dst_value = builder.CreateLoad(dst_ptr, "dst_val");
-  switch (agg_fn->agg_op()) {
-    case AggFn::COUNT:
-      if (agg_fn->is_merge()) {
-        result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
-      } else {
-        result = builder.CreateAdd(dst_value,
-            codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
-      }
-      break;
-    case AggFn::MIN: {
-      Function* min_fn = codegen->CodegenMinMax(slot_desc->type(), true);
-      Value* min_args[] = { dst_value, src.GetVal() };
-      result = builder.CreateCall(min_fn, min_args, "min_value");
-      break;
-    }
-    case AggFn::MAX: {
-      Function* max_fn = codegen->CodegenMinMax(slot_desc->type(), false);
-      Value* max_args[] = { dst_value, src.GetVal() };
-      result = builder.CreateCall(max_fn, max_args, "max_value");
-      break;
-    }
-    case AggFn::SUM:
-      if (slot_desc->type().type == TYPE_FLOAT || slot_desc->type().type == TYPE_DOUBLE) {
-        result = builder.CreateFAdd(dst_value, src.GetVal());
-      } else {
-        result = builder.CreateAdd(dst_value, src.GetVal());
-      }
-      break;
-    case AggFn::NDV: {
-      DCHECK_EQ(slot_desc->type().type, TYPE_STRING);
-      IRFunction::Type ir_function_type = agg_fn->is_merge() ? IRFunction::HLL_MERGE
-                                          : GetHllUpdateFunction2(input_expr->type());
-      Function* hll_fn = codegen->GetFunction(ir_function_type, false);
-
-      // Create pointer to src_anyval to pass to HllUpdate() function. We must use the
-      // unlowered type.
-      Value* src_unlowered_ptr = src.GetUnloweredPtr("src_unlowered_ptr");
-
-      // Create StringVal* intermediate argument from dst_value
-      CodegenAnyVal dst_stringval =
-          CodegenAnyVal::GetNonNullVal(codegen, &builder, TYPE_STRING, "dst_stringval");
-      dst_stringval.SetFromRawValue(dst_value);
-
-      // Create pointer to dst_stringval to pass to HllUpdate() function. We must use
-      // the unlowered type.
-      Value* dst_lowered_ptr = dst_stringval.GetLoweredPtr("dst_lowered_ptr");
-      Type* dst_unlowered_ptr_type =
-          codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, TYPE_STRING));
-      Value* dst_unlowered_ptr = builder.CreateBitCast(
-          dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
-
-      // Get the FunctionContext object for the AggFnEvaluator.
-      Value* agg_fn_ctx_arg = codegen->CodegenCallFunction(&builder,
-          IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, agg_fn_eval_arg,
-          "agg_fn_ctx_arg");
-
-      // Call 'hll_fn'
-      builder.CreateCall(hll_fn, {agg_fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr});
-
-      // Convert StringVal intermediate 'dst_arg' back to StringValue
-      Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
-      result = CodegenAnyVal(codegen, &builder, TYPE_STRING, anyval_result)
-               .ToNativeValue();
-      break;
-    }
-    default:
-      DCHECK(false) << "bad aggregate operator: " << agg_fn->agg_op();
-  }
-
-  builder.CreateStore(result, dst_ptr);
-  builder.CreateBr(ret_block);
-
-  builder.SetInsertPoint(ret_block);
-  builder.CreateRetVoid();
-
-  return codegen->FinalizeFunction(fn);
-}
-
-// IR codegen for the UpdateTuple loop.  This loop is query specific and
-// based on the aggregate functions.  The function signature must match the non-
-// codegen'd UpdateTuple exactly.
-// For the query:
-// select count(*), count(int_col), sum(double_col) the IR looks like:
-//
-// define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr,
-//     %"class.impala::Tuple"* %agg_tuple, %"class.impala::TupleRow"* %tuple_row) #32 {
-// entry:
-//   %tuple = bitcast %"class.impala::Tuple"* %agg_tuple to <{ i64, i64, double, i8 }>*
-//   %agg_fn_evals = call %"class.impala::AggFnEvaluator"**
-//       @_ZNK6impala15AggregationNode12agg_fn_evalsEv(
-//           %"class.impala::AggregationNode"* %this_ptr)
-//   %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
-//       <{ i64, i64, double, i8 }>* %tuple, i32 0, i32 0
-//   %count_star_val = load i64, i64* %src_slot
-//   %count_star_inc = add i64 %count_star_val, 1
-//   store i64 %count_star_inc, i64* %src_slot
-//   %0 = getelementptr %"class.impala::AggFnEvaluator"*,
-//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
-//   %agg_fn_eval =
-//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
-//   call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
-//       <{ i64, i64, double, i8 }>* %tuple, %"class.impala::TupleRow"* %tuple_row)
-//   %1 = getelementptr %"class.impala::AggFnEvaluator"*,
-//       %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
-//   %agg_fn_eval1 =
-//       load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
-//   call void @UpdateSlot.3(%"class.impala::AggFnEvaluator"* %agg_fn_eval1,
-//       <{ i64, i64, double, i8 }>* %tuple, %"class.impala::TupleRow"* %tuple_row)
-//   ret void
-// }
-//
-Function* AggregationNode::CodegenUpdateTuple(LlvmCodeGen* codegen) {
-  SCOPED_TIMER(codegen->codegen_timer());
-
-  int j = grouping_exprs_.size();
-  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFn* agg_fn = agg_fns_[i];
-
-    // Timestamp and char are never supported. NDV supports decimal and string but no
-    // other functions.
-    // TODO: the other aggregate functions might work with decimal as-is
-    if (slot_desc->type().type == TYPE_TIMESTAMP || slot_desc->type().type == TYPE_CHAR ||
-        (agg_fn->agg_op() != AggFn::NDV &&
-         (slot_desc->type().type == TYPE_DECIMAL ||
-          slot_desc->type().type == TYPE_STRING ||
-          slot_desc->type().type == TYPE_VARCHAR))) {
-      VLOG_QUERY << "Could not codegen UpdateIntermediateTuple because "
-                 << "string, char, timestamp and decimal are not yet supported.";
-      return nullptr;
-    }
-
-    // Don't codegen things that aren't builtins (for now)
-    if (!agg_fn->is_builtin()) return nullptr;
-  }
-
-  if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateTuple because we could"
-               << "not generate a matching llvm struct for the intermediate tuple.";
-    return nullptr;
-  }
-
-  // Get the types to match the UpdateTuple signature
-  Type* agg_node_type = codegen->GetType(AggregationNode::LLVM_CLASS_NAME);
-  Type* agg_tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-
-  DCHECK(agg_node_type != nullptr);
-  DCHECK(agg_tuple_type != nullptr);
-  DCHECK(tuple_row_type != nullptr);
-
-  PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
-  PointerType* agg_tuple_ptr_type = codegen->GetPtrType(agg_tuple_type);
-  PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
-
-  // Signature for UpdateTuple is
-  // void UpdateTuple(AggregationNode* this, FunctionContext** fn_ctx,
-  //     ScalarExprEvaluator** expr_eval, Tuple* tuple, TupleRow* row)
-  // This signature needs to match the non-codegen'd signature exactly.
-  StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
-  if (tuple_struct == nullptr) {
-    VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
-    return nullptr;
-  }
-  PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
-  LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", agg_tuple_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
-
-  LlvmBuilder builder(codegen->context());
-  Value* args[3];
-  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
-
-  // Cast the parameter types to the internal llvm runtime types.
-  // TODO: get rid of this by using right type in function signature
-  Value* this_arg = args[0];
-  Value* agg_tuple_arg = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
-  Value* row_arg = args[2];
-
-  // Load &agg_fn_evals_[0]
-  Value* agg_fn_evals_vector = codegen->CodegenCallFunction(&builder,
-      IRFunction::AGG_NODE_GET_AGG_FN_EVALUATORS, this_arg, "agg_fn_evals");
-
-  // Loop over each expr and generate the IR for that slot. If the expr is not
-  // count(*), generate a helper IR function to update the slot and call that.
-  j = grouping_exprs_.size();
-  for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
-    SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
-    AggFn* agg_fn = agg_fns_[i];
-    if (agg_fn->is_count_star()) {
-      // TODO: we should be able to hoist this up to the loop over the batch and just
-      // increment the slot by the number of rows in the batch.
-      int field_idx = slot_desc->llvm_field_idx();
-      Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
-      Value* slot_ptr = builder.CreateStructGEP(nullptr, agg_tuple_arg, field_idx,
-          "src_slot");
-      Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
-      Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
-      builder.CreateStore(count_inc, slot_ptr);
-    } else {
-      Function* update_slot_fn = CodegenUpdateSlot(codegen, i, slot_desc);
-      if (update_slot_fn == nullptr) return nullptr;
-
-      // Load agg_fn_evals_[i]
-      DCHECK(agg_fn_evals_[i] != nullptr);
-      Value* agg_fn_eval_arg = codegen->CodegenArrayAt(
-          &builder, agg_fn_evals_vector, i, "agg_fn_eval");
-      builder.CreateCall(update_slot_fn, {agg_fn_eval_arg, agg_tuple_arg, row_arg});
-    }
-  }
-  builder.CreateRetVoid();
-
-  // CodegenProcessRowBatch() does the final optimizations.
-  return codegen->FinalizeFunction(fn);
-}
-
-Function* AggregationNode::CodegenProcessRowBatch(LlvmCodeGen* codegen,
-    Function* update_tuple_fn) {
-  SCOPED_TIMER(codegen->codegen_timer());
-  DCHECK(update_tuple_fn != nullptr);
-
-  // Get the cross compiled update row batch function
-  IRFunction::Type ir_fn = (!grouping_exprs_.empty() ?
-      IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING :
-      IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING);
-  Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
-
-  if (process_batch_fn == nullptr) {
-    LOG(ERROR) << "Could not find AggregationNode::ProcessRowBatch in module.";
-    return nullptr;
-  }
-
-  int replaced;
-  if (!grouping_exprs_.empty()) {
-    // Aggregation w/o grouping does not use a hash table.
-
-    // Codegen for hash
-    Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
-    if (hash_fn == nullptr) return nullptr;
-
-    // Codegen HashTable::Equals
-    Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
-    if (equals_fn == nullptr) return nullptr;
-
-    // Codegen for evaluating build rows
-    Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
-    if (eval_build_row_fn == nullptr) return nullptr;
-
-    // Codegen for evaluating probe rows
-    Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
-    if (eval_probe_row_fn == nullptr) return nullptr;
-
-    // Replace call sites
-    replaced =
-        codegen->ReplaceCallSites(process_batch_fn, eval_build_row_fn, "EvalBuildRow");
-    DCHECK_EQ(replaced, 1);
-
-    replaced =
-        codegen->ReplaceCallSites(process_batch_fn, eval_probe_row_fn, "EvalProbeRow");
-    DCHECK_EQ(replaced, 1);
-
-    replaced = codegen->ReplaceCallSites(process_batch_fn, hash_fn, "HashCurrentRow");
-    DCHECK_EQ(replaced, 2);
-
-    replaced = codegen->ReplaceCallSites(process_batch_fn, equals_fn, "Equals");
-    DCHECK_EQ(replaced, 1);
-  }
-
-  replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple");
-  DCHECK_EQ(replaced, 1);
-
-  return codegen->FinalizeFunction(process_batch_fn);
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h
deleted file mode 100644
index 0ea2f49..0000000
--- a/be/src/exec/aggregation-node.h
+++ /dev/null
@@ -1,174 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_AGGREGATION_NODE_H
-#define IMPALA_EXEC_AGGREGATION_NODE_H
-
-#include <boost/scoped_ptr.hpp>
-
-#include "exec/exec-node.h"
-#include "exec/old-hash-table.h"
-#include "exprs/agg-fn.h"
-#include "runtime/descriptors.h"  // for TupleId
-#include "runtime/mem-pool.h"
-#include "runtime/string-value.h"
-
-namespace llvm {
-  class Function;
-}
-
-namespace impala {
-
-class AggFnEvaluator;
-class LlvmCodeGen;
-class RowBatch;
-class RuntimeState;
-struct StringValue;
-class Tuple;
-class TupleDescriptor;
-class SlotDescriptor;
-
-/// Node for in-memory hash aggregation.
-/// The node creates a hash set of aggregation intermediate tuples, which
-/// contain slots for all grouping and aggregation exprs (the grouping
-/// slots precede the aggregation expr slots in the output tuple descriptor).
-//
-/// TODO: codegen cross-compiled UDAs and get rid of handcrafted IR.
-/// TODO: investigate high compile time for wide tables
-class AggregationNode : public ExecNode {
- public:
-  AggregationNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
-
-  static const char* LLVM_CLASS_NAME;
-
- protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
-  virtual void DebugString(int indentation_level, std::stringstream* out) const;
-
- private:
-  boost::scoped_ptr<OldHashTable> hash_tbl_;
-  OldHashTable::Iterator output_iterator_;
-
-  /// The list of all aggregate operations for this exec node.
-  std::vector<AggFn*> agg_fns_;
-  std::vector<AggFnEvaluator*> agg_fn_evals_;
-
-  /// Backing MemPools of 'agg_fn_evals_'.
-  boost::scoped_ptr<MemPool> agg_fn_pool_;
-
-  /// Group-by exprs used to evaluate input rows.
-  std::vector<ScalarExpr*> grouping_exprs_;
-
-  /// Exprs used to insert constructed aggregation tuple into the hash table.
-  /// All the exprs are simply SlotRefs for the intermediate tuple.
-  std::vector<ScalarExpr*> build_exprs_;
-
-  /// Tuple into which Update()/Merge()/Serialize() results are stored.
-  TupleId intermediate_tuple_id_;
-  TupleDescriptor* intermediate_tuple_desc_;
-
-  /// Construct a new row desc for preparing the build exprs because neither the child's
-  /// nor this node's output row desc may contain the intermediate tuple, e.g.,
-  /// in a single-node plan with an intermediate tuple different from the output tuple.
-  /// Lives in the query state's obj_pool.
-  RowDescriptor* intermediate_row_desc_;
-
-  /// Tuple into which Finalize() results are stored. Possibly the same as
-  /// the intermediate tuple.
-  TupleId output_tuple_id_;
-  TupleDescriptor* output_tuple_desc_;
-
-  /// Intermediate result of aggregation w/o GROUP BY.
-  /// Note: can be NULL even if there is no grouping if the result tuple is 0 width
-  Tuple* singleton_intermediate_tuple_;
-
-  boost::scoped_ptr<MemPool> tuple_pool_;
-
-  /// IR for process row batch.  NULL if codegen is disabled.
-  llvm::Function* codegen_process_row_batch_fn_;
-
-  typedef void (*ProcessRowBatchFn)(AggregationNode*, RowBatch*);
-  /// Jitted ProcessRowBatch function pointer.  Null if codegen is disabled.
-  ProcessRowBatchFn process_row_batch_fn_;
-
-  /// Certain aggregates require a finalize step, which is the final step of the
-  /// aggregate after consuming all input rows. The finalize step converts the aggregate
-  /// value into its final form. This is true if this node contains aggregate that requires
-  /// a finalize step.
-  bool needs_finalize_;
-
-  /// Time spent processing the child rows
-  RuntimeProfile::Counter* build_timer_;
-  /// Time spent returning the aggregated rows
-  RuntimeProfile::Counter* get_results_timer_;
-  /// Num buckets in hash table
-  RuntimeProfile::Counter* hash_table_buckets_counter_;
-  /// Load factor in hash table
-  RuntimeProfile::Counter* hash_table_load_factor_counter_;
-
-  /// Constructs a new aggregation intermediate tuple (allocated from tuple_pool_),
-  /// initialized to grouping values computed over 'current_row_'.
-  /// Aggregation expr slots are set to their initial values.
-  Tuple* ConstructIntermediateTuple();
-
-  /// Updates the aggregation intermediate tuple 'tuple' with aggregation values
-  /// computed over 'row'. This function is replaced by codegen.
-  void UpdateTuple(Tuple* tuple, TupleRow* row);
-
-  /// Called on the intermediate tuple of each group after all input rows have been
-  /// consumed and aggregated. Computes the final aggregate values to be returned in
-  /// GetNext() using the agg fn evaluators' Serialize() or Finalize().
-  /// For the Finalize() case if the output tuple is different from the intermediate
-  /// tuple, then a new tuple is allocated from 'pool' to hold the final result.
-  /// Returns the tuple holding the final aggregate values.
-  Tuple* FinalizeTuple(Tuple* tuple, MemPool* pool);
-
-  /// Cross-compiled accessor for 'agg_fn_evals_'. Used by the codegen'ed code.
-  AggFnEvaluator* const* IR_ALWAYS_INLINE agg_fn_evals() const;
-
-  /// Do the aggregation for all tuple rows in the batch
-  void ProcessRowBatchNoGrouping(RowBatch* batch);
-  void ProcessRowBatchWithGrouping(RowBatch* batch);
-
-  /// Codegen the process row batch loop.  The loop has already been compiled to
-  /// IR and loaded into the codegen object.  UpdateAggTuple has also been
-  /// codegen'd to IR.  This function will modify the loop subsituting the
-  /// UpdateAggTuple function call with the (inlined) codegen'd 'update_tuple_fn'.
-  llvm::Function* CodegenProcessRowBatch(LlvmCodeGen* codegen,
-      llvm::Function* update_tuple_fn);
-
-  /// Codegen for updating aggregate_exprs at agg_fn_idx. Returns NULL if unsuccessful.
-  /// agg_fn_idx is the idx into agg_fns_ (does not include grouping exprs).
-  llvm::Function* CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
-      SlotDescriptor* slot_desc);
-
-  /// Codegen UpdateTuple(). Returns NULL if codegen is unsuccessful.
-  llvm::Function* CodegenUpdateTuple(LlvmCodeGen* codegen);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index eeaccd6..7ccddc0 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -153,13 +153,7 @@ void BlockingJoinNode::ProcessBuildInputAsync(
     *status = child(1)->Open(state);
   }
   if (status->ok()) *status = AcquireResourcesForBuild(state);
-  if (status->ok()) {
-    if (build_sink == nullptr){
-      *status = ProcessBuildInput(state);
-    } else {
-      *status = SendBuildInputToSink<true>(state, build_sink);
-    }
-  }
+  if (status->ok()) *status = SendBuildInputToSink<true>(state, build_sink);
   // IMPALA-1863: If the build-side thread failed, then we need to close the right
   // (build-side) child to avoid a potential deadlock between fragment instances.  This
   // is safe to do because while the build may have partially completed, it will not be
@@ -226,11 +220,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     RETURN_IF_ERROR(child(0)->Open(state));
     RETURN_IF_ERROR(child(1)->Open(state));
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
-    if (build_sink == NULL) {
-      RETURN_IF_ERROR(ProcessBuildInput(state));
-    } else {
-      RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
-    }
+    RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
   } else {
     // The left/right child never overlap. The overlap stops here.
     built_probe_overlap_stop_watch_.SetTimeCeiling();
@@ -238,11 +228,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     // can release any resources only used during its Open().
     RETURN_IF_ERROR(child(1)->Open(state));
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
-    if (build_sink == NULL) {
-      RETURN_IF_ERROR(ProcessBuildInput(state));
-    } else {
-      RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
-    }
+    RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
     if (CanCloseBuildEarly()) child(1)->Close(state);
     RETURN_IF_ERROR(child(0)->Open(state));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 1972b34..b7dd79a 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -114,20 +114,9 @@ class BlockingJoinNode : public ExecNode {
   /// SendBuildInputToSink is called to allocate resources for this ExecNode.
   virtual Status AcquireResourcesForBuild(RuntimeState* state) { return Status::OK(); }
 
-  /// Processes the build-side input.
-  /// Called from ProcessBuildInputAndOpenProbe() if the subclass does not provide a
-  /// DataSink to consume the build input. The build-side input is already open when
-  /// this is called.
-  /// Note that this can be called concurrently with Open'ing the left child to
-  /// increase parallelism. If, for example, the left child is another join node,
-  /// it can start its own build at the same time.
-  /// TODO: move all subclasses to use the DataSink interface and remove this method.
-  virtual Status ProcessBuildInput(RuntimeState* state) = 0;
-
-  /// Processes the build-side input, which should be already open, and opens the probe
-  /// side. Will do both concurrently if not in a subplan and an extra thread token is
-  /// available. If 'build_sink' is non-NULL, sends the build-side input to 'build_sink'.
-  /// Otherwise calls ProcessBuildInput on the subclass.
+  /// Processes the build-side input, which should be already open, by sending it to
+  /// 'build_sink', wand opens the probe side. Will do both concurrently if not in a
+  /// subplan and an extra thread token is available.
   Status ProcessBuildInputAndOpenProbe(RuntimeState* state, DataSink* build_sink);
 
   /// Set up 'current_probe_row_' to point to the first input row from the left child

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 7954660..c3d9946 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -29,12 +29,10 @@
 #include "common/status.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "exec/aggregation-node.h"
 #include "exec/analytic-eval-node.h"
 #include "exec/data-source-scan-node.h"
 #include "exec/empty-set-node.h"
 #include "exec/exchange-node.h"
-#include "exec/hash-join-node.h"
 #include "exec/hbase-scan-node.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
@@ -64,9 +62,8 @@
 
 using namespace llvm;
 
-// TODO: remove when we remove hash-join-node.cc and aggregation-node.cc
-DEFINE_bool(enable_partitioned_hash_join, true, "Enable partitioned hash join");
-DEFINE_bool(enable_partitioned_aggregation, true, "Enable partitioned hash agg");
+DEFINE_bool(enable_partitioned_hash_join, true, "Deprecated - has no effect");
+DEFINE_bool(enable_partitioned_aggregation, true, "Deprecated - has no effect");
 
 namespace impala {
 
@@ -299,24 +296,10 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       }
       break;
     case TPlanNodeType::AGGREGATION_NODE:
-      if (FLAGS_enable_partitioned_aggregation) {
-        *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
-      } else {
-        *node = pool->Add(new AggregationNode(pool, tnode, descs));
-      }
+      *node = pool->Add(new PartitionedAggregationNode(pool, tnode, descs));
       break;
     case TPlanNodeType::HASH_JOIN_NODE:
-      // The (old) HashJoinNode does not support left-anti, right-semi, and right-anti
-      // joins.
-      if (tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::RIGHT_SEMI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::RIGHT_ANTI_JOIN ||
-          tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-          FLAGS_enable_partitioned_hash_join) {
-        *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
-      } else {
-        *node = pool->Add(new HashJoinNode(pool, tnode, descs));
-      }
+      *node = pool->Add(new PartitionedHashJoinNode(pool, tnode, descs));
       break;
     case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
       *node = pool->Add(new NestedLoopJoinNode(pool, tnode, descs));
@@ -350,13 +333,6 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       *node = pool->Add(new SingularRowSrcNode(pool, tnode, descs));
       break;
     case TPlanNodeType::SUBPLAN_NODE:
-      if (!FLAGS_enable_partitioned_hash_join || !FLAGS_enable_partitioned_aggregation) {
-        error_msg << "Query referencing nested types is not supported because the "
-            << "--enable_partitioned_hash_join and/or --enable_partitioned_aggregation "
-            << "Impala Daemon start-up flags are set to false.\nTo enable nested types "
-            << "support please set those flags to true (they are enabled by default).";
-        return Status(error_msg.str());
-      }
       *node = pool->Add(new SubplanNode(pool, tnode, descs));
       break;
     case TPlanNodeType::UNNEST_NODE:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node-ir.cc b/be/src/exec/hash-join-node-ir.cc
deleted file mode 100644
index 25aa556..0000000
--- a/be/src/exec/hash-join-node-ir.cc
+++ /dev/null
@@ -1,140 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "codegen/impala-ir.h"
-#include "exec/hash-join-node.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/row-batch.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-// Functions in this file are cross compiled to IR with clang.
-
-// Wrapper around ExecNode's eval conjuncts with a different function name.
-// This lets us distinguish between the join conjuncts vs. non-join conjuncts
-// for codegen.
-// Note: don't declare this static.  LLVM will pick the fastcc calling convention and
-// we will not be able to replace the functions with codegen'd versions.
-// TODO: explicitly set the calling convention?
-// TODO: investigate using fastcc for all codegen internal functions?
-bool IR_NO_INLINE EvalOtherJoinConjuncts2(
-    ScalarExprEvaluator* const* evals, int num_evals, TupleRow* row) {
-  return ExecNode::EvalConjuncts(evals, num_evals, row);
-}
-
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
-int HashJoinNode::ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch,
-    int max_added_rows) {
-  // This path does not handle full outer or right outer joins
-  DCHECK(!match_all_build_);
-
-  int row_idx = out_batch->AddRows(max_added_rows);
-  uint8_t* out_row_mem = reinterpret_cast<uint8_t*>(out_batch->GetRow(row_idx));
-  TupleRow* out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-
-  int rows_returned = 0;
-  int probe_rows = probe_batch->num_rows();
-
-  ScalarExprEvaluator* const* other_conjunct_evals =
-      other_join_conjunct_evals_.data();
-  const int num_other_conjuncts = other_join_conjuncts_.size();
-  DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size());
-
-  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
-  const int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  while (true) {
-    // Create output row for each matching build row
-    while (!hash_tbl_iterator_.AtEnd()) {
-      TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
-      hash_tbl_iterator_.Next<true>();
-
-      if (join_op_ == TJoinOp::LEFT_SEMI_JOIN) {
-        // Evaluate the non-equi-join conjuncts against a temp row assembled from all
-        // build and probe tuples.
-        if (num_other_conjuncts > 0) {
-          CreateOutputRow(semi_join_staging_row_, current_probe_row_, matched_build_row);
-          if (!EvalOtherJoinConjuncts2(other_conjunct_evals, num_other_conjuncts,
-                  semi_join_staging_row_)) {
-            continue;
-          }
-        }
-        out_batch->CopyRow(current_probe_row_, out_row);
-      } else {
-        CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-        if (!EvalOtherJoinConjuncts2(
-                other_conjunct_evals, num_other_conjuncts, out_row)) {
-          continue;
-        }
-      }
-      matched_probe_ = true;
-
-      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
-        ++rows_returned;
-        // Filled up out batch or hit limit
-        if (UNLIKELY(rows_returned == max_added_rows)) goto end;
-        // Advance to next out row
-        out_row_mem += out_batch->row_byte_size();
-        out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-      }
-
-      // Handle left semi-join
-      if (match_one_build_) {
-        hash_tbl_iterator_ = hash_tbl_->End();
-        break;
-      }
-    }
-
-    // Handle left outer-join
-    if (!matched_probe_ && match_all_probe_) {
-      CreateOutputRow(out_row, current_probe_row_, NULL);
-      matched_probe_ = true;
-      if (EvalConjuncts(conjunct_evals, num_conjuncts, out_row)) {
-        ++rows_returned;
-        if (UNLIKELY(rows_returned == max_added_rows)) goto end;
-        // Advance to next out row
-        out_row_mem += out_batch->row_byte_size();
-        out_row = reinterpret_cast<TupleRow*>(out_row_mem);
-      }
-    }
-
-    if (hash_tbl_iterator_.AtEnd()) {
-      // Advance to the next probe row
-      if (UNLIKELY(probe_batch_pos_ == probe_rows)) goto end;
-      current_probe_row_ = probe_batch->GetRow(probe_batch_pos_++);
-      hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
-      matched_probe_ = false;
-    }
-  }
-
-end:
-  if (match_one_build_ && matched_probe_) hash_tbl_iterator_ = hash_tbl_->End();
-  out_batch->CommitRows(rows_returned);
-  return rows_returned;
-}
-
-void HashJoinNode::ProcessBuildBatch(RowBatch* build_batch) {
-  // insert build row into our hash table
-  for (int i = 0; i < build_batch->num_rows(); ++i) {
-    hash_tbl_->Insert(build_batch->GetRow(i));
-  }
-}


[4/6] incubator-impala git commit: IMPALA-4674: Part 1: remove old aggs and joins

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
deleted file mode 100644
index c35d05d..0000000
--- a/be/src/exec/hash-join-node.cc
+++ /dev/null
@@ -1,673 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hash-join-node.h"
-
-#include <functional>
-#include <numeric>
-#include <sstream>
-
-#include "codegen/llvm-codegen.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tuple-row.h"
-#include "util/debug-util.h"
-#include "util/bloom-filter.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/PlanNodes_types.h"
-
-#include "common/names.h"
-
-DEFINE_bool(enable_probe_side_filtering, true, "Deprecated.");
-
-using namespace impala;
-using namespace llvm;
-using namespace strings;
-
-const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode";
-
-HashJoinNode::HashJoinNode(
-    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-  : BlockingJoinNode("HashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
-    is_not_distinct_from_(),
-    codegen_process_build_batch_fn_(NULL),
-    process_build_batch_fn_(NULL),
-    process_probe_batch_fn_(NULL) {
-  // The hash join node does not support cross or anti joins
-  DCHECK_NE(join_op_, TJoinOp::CROSS_JOIN);
-  DCHECK_NE(join_op_, TJoinOp::LEFT_ANTI_JOIN);
-  DCHECK_NE(join_op_, TJoinOp::RIGHT_SEMI_JOIN);
-  DCHECK_NE(join_op_, TJoinOp::RIGHT_ANTI_JOIN);
-  DCHECK_NE(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
-
-  match_all_probe_ =
-      join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN;
-  match_one_build_ = join_op_ == TJoinOp::LEFT_SEMI_JOIN;
-  match_all_build_ =
-      join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN;
-}
-
-Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
-  RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
-  DCHECK(tnode.__isset.hash_join_node);
-  const vector<TEqJoinCondition>& eq_join_conjuncts =
-      tnode.hash_join_node.eq_join_conjuncts;
-
-  for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
-    ScalarExpr* probe_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(
-        eq_join_conjuncts[i].left, *child(0)->row_desc(), state, &probe_expr));
-    probe_exprs_.push_back(probe_expr);
-    ScalarExpr* build_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(
-        eq_join_conjuncts[i].right, *child(1)->row_desc(), state, &build_expr));
-    build_exprs_.push_back(build_expr);
-    is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
-  }
-
-  // other_join_conjunct_evals_ are evaluated in the context of rows assembled from
-  // all build and probe tuples; full_row_desc is not necessarily the same as the output
-  // row desc, e.g., because semi joins only return the build xor probe tuples
-  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
-  RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
-      full_row_desc, state, &other_join_conjuncts_));
-
-  for (const TRuntimeFilterDesc& tfilter: tnode.runtime_filters) {
-    // If filter propagation not enabled, only consider building broadcast joins (that may
-    // be consumed by this fragment).
-    if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
-        !tfilter.is_broadcast_join) {
-      continue;
-    }
-    if (state->query_options().disable_row_runtime_filtering &&
-        !tfilter.applied_on_partition_columns) {
-      continue;
-    }
-    filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true));
-    ScalarExpr* filter_expr;
-    RETURN_IF_ERROR(
-        ScalarExpr::Create(tfilter.src_expr, *child(1)->row_desc(), state, &filter_expr));
-    filter_exprs_.push_back(filter_expr);
-  }
-  return Status::OK();
-}
-
-Status HashJoinNode::Prepare(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
-
-  build_buckets_counter_ =
-      ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
-  hash_tbl_load_factor_counter_ =
-      ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
-  // build and probe exprs are evaluated in the context of the rows produced by our
-  // right and left children, respectively
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_,
-      expr_mem_pool(), &other_join_conjunct_evals_));
-  AddEvaluatorsToFree(other_join_conjunct_evals_);
-
-  // TODO: default buckets
-  const bool stores_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN
-      || join_op_ == TJoinOp::FULL_OUTER_JOIN
-      || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(),
-                                false, std::logical_or<bool>());
-
-  RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, probe_exprs_,
-      filter_exprs_, child(1)->row_desc()->tuple_descriptors().size(), stores_nulls,
-      is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_,
-      &hash_tbl_));
-  build_pool_.reset(new MemPool(mem_tracker()));
-  AddCodegenDisabledMessage(state);
-  return Status::OK();
-}
-
-void HashJoinNode::Codegen(RuntimeState* state) {
-  DCHECK(state->ShouldCodegen());
-  ExecNode::Codegen(state);
-  if (IsNodeCodegenDisabled()) return;
-
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != NULL);
-  bool build_codegen_enabled = false;
-  bool probe_codegen_enabled = false;
-
-  // Codegen for hashing rows
-  Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
-  if (hash_fn != NULL) {
-    // Codegen for build path
-    codegen_process_build_batch_fn_ = CodegenProcessBuildBatch(codegen, hash_fn);
-    if (codegen_process_build_batch_fn_ != NULL) {
-      codegen->AddFunctionToJit(codegen_process_build_batch_fn_,
-          reinterpret_cast<void**>(&process_build_batch_fn_));
-      build_codegen_enabled = true;
-    }
-
-    // Codegen for probe path (only for left joins)
-    if (!match_all_build_) {
-      Function* codegen_process_probe_batch_fn =
-          CodegenProcessProbeBatch(codegen, hash_fn);
-      if (codegen_process_probe_batch_fn != NULL) {
-        codegen->AddFunctionToJit(codegen_process_probe_batch_fn,
-            reinterpret_cast<void**>(&process_probe_batch_fn_));
-        probe_codegen_enabled = true;
-      }
-    }
-  }
-  runtime_profile()->AddCodegenMsg(build_codegen_enabled, "", "Build Side");
-  runtime_profile()->AddCodegenMsg(probe_codegen_enabled, "", "Probe Side");
-}
-
-Status HashJoinNode::Reset(RuntimeState* state) {
-  DCHECK(false) << "NYI";
-  return Status("NYI");
-}
-
-void HashJoinNode::Close(RuntimeState* state) {
-  if (is_closed()) return;
-  if (hash_tbl_.get() != NULL) hash_tbl_->Close(state);
-  if (build_pool_.get() != NULL) build_pool_->FreeAll();
-  ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
-  ScalarExpr::Close(probe_exprs_);
-  ScalarExpr::Close(build_exprs_);
-  ScalarExpr::Close(other_join_conjuncts_);
-  ScalarExpr::Close(filter_exprs_);
-  BlockingJoinNode::Close(state);
-}
-
-Status HashJoinNode::Open(RuntimeState* state) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(BlockingJoinNode::Open(state));
-  RETURN_IF_ERROR(hash_tbl_->Open(state));
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state));
-
-  // Check for errors and free local allocations before opening children.
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
-
-  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, NULL));
-  RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
-  InitGetNext();
-  return Status::OK();
-}
-
-Status HashJoinNode::QueryMaintenance(RuntimeState* state) {
-  if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations();
-  return ExecNode::QueryMaintenance(state);
-}
-
-Status HashJoinNode::ProcessBuildInput(RuntimeState* state) {
-  // Do a full scan of child(1) and store everything in hash_tbl_
-  // The hash join node needs to keep in memory all build tuples, including the tuple
-  // row ptrs.  The row ptrs are copied into the hash table's internal structure so they
-  // don't need to be stored in the build_pool_.
-  RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker());
-  while (true) {
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-    bool eos;
-    {
-      SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
-      RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
-    }
-    SCOPED_TIMER(build_timer_);
-    // take ownership of tuple data of build_batch
-    build_pool_->AcquireData(build_batch.tuple_data_pool(), false);
-    RETURN_IF_ERROR(QueryMaintenance(state));
-
-    // Call codegen version if possible
-    if (process_build_batch_fn_ == NULL) {
-      ProcessBuildBatch(&build_batch);
-    } else {
-      process_build_batch_fn_(this, &build_batch);
-    }
-    VLOG_ROW << hash_tbl_->DebugString(true, false, child(1)->row_desc());
-
-    COUNTER_SET(build_row_counter_, hash_tbl_->size());
-    COUNTER_SET(build_buckets_counter_, hash_tbl_->num_buckets());
-    COUNTER_SET(hash_tbl_load_factor_counter_, hash_tbl_->load_factor());
-    build_batch.Reset();
-    DCHECK(!build_batch.AtCapacity());
-    if (eos) break;
-  }
-
-  if (filters_.size() > 0) {
-    int num_enabled_filters = hash_tbl_->AddBloomFilters();
-    if (num_enabled_filters == filters_.size()) {
-      runtime_profile()->AppendExecOption(
-          Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
-              filters_.size() == 1 ? "" : "s"));
-    } else {
-      string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
-          num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
-          filters_.size() - num_enabled_filters);
-      runtime_profile()->AppendExecOption(exec_option);
-    }
-  }
-
-  return Status::OK();
-}
-
-void HashJoinNode::InitGetNext() {
-  if (current_probe_row_ == NULL) {
-    hash_tbl_iterator_ = hash_tbl_->Begin();
-  } else {
-    matched_probe_ = false;
-    hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
-  }
-}
-
-Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
-  SCOPED_TIMER(runtime_profile_->total_time_counter());
-  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
-  RETURN_IF_CANCELLED(state);
-  RETURN_IF_ERROR(QueryMaintenance(state));
-  if (ReachedLimit()) {
-    *eos = true;
-    return Status::OK();
-  }
-  *eos = false;
-
-  // These cases are simpler and use a more efficient processing loop
-  if (!match_all_build_) {
-    if (eos_) {
-      *eos = true;
-      return Status::OK();
-    }
-    return LeftJoinGetNext(state, out_batch, eos);
-  }
-
-  const int num_other_conjuncts = other_join_conjuncts_.size();
-  DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size());
-
-  const int num_conjuncts = conjuncts_.size();
-  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
-  // Explicitly manage the timer counter to avoid measuring time in the child
-  // GetNext call.
-  ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_);
-
-  while (!eos_) {
-    // create output rows as long as:
-    // 1) we haven't already created an output row for the probe row and are doing
-    //    a semi-join;
-    // 2) there are more matching build rows
-    while (!hash_tbl_iterator_.AtEnd()) {
-      int row_idx = out_batch->AddRow();
-      TupleRow* out_row = out_batch->GetRow(row_idx);
-
-      TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
-      CreateOutputRow(out_row, current_probe_row_, matched_build_row);
-      if (!EvalConjuncts(other_join_conjunct_evals_.data(),
-              num_other_conjuncts, out_row)) {
-        hash_tbl_iterator_.Next<true>();
-        continue;
-      }
-      // we have a match for the purpose of the (outer?) join as soon as we
-      // satisfy the JOIN clause conjuncts
-      matched_probe_ = true;
-      if (match_all_build_) {
-        // remember that we matched this build row
-        hash_tbl_iterator_.set_matched(true);
-        VLOG_ROW << "joined build row: " << matched_build_row;
-      }
-
-      hash_tbl_iterator_.Next<true>();
-      if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
-        out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
-        ++num_rows_returned_;
-        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-        if (out_batch->AtCapacity() || ReachedLimit()) {
-          *eos = ReachedLimit();
-          return Status::OK();
-        }
-      }
-    }
-
-    // If a probe row exists at this point, check whether we need to output the current
-    // probe row before getting a new probe batch. (IMPALA-2440)
-    bool probe_row_exists = probe_batch_->num_rows() > 0;
-    if (match_all_probe_ && !matched_probe_ && probe_row_exists) {
-      int row_idx = out_batch->AddRow();
-      TupleRow* out_row = out_batch->GetRow(row_idx);
-      CreateOutputRow(out_row, current_probe_row_, NULL);
-      if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
-        out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
-        ++num_rows_returned_;
-        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-        matched_probe_ = true;
-        if (out_batch->AtCapacity() || ReachedLimit()) {
-          *eos = ReachedLimit();
-          return Status::OK();
-        }
-      }
-    }
-
-    if (probe_batch_pos_ == probe_batch_->num_rows()) {
-      // pass on resources, out_batch might still need them
-      probe_batch_->TransferResourceOwnership(out_batch);
-      probe_batch_pos_ = 0;
-      if (out_batch->AtCapacity()) return Status::OK();
-      // get new probe batch
-      if (!probe_side_eos_) {
-        while (true) {
-          probe_timer.Stop();
-          RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
-          probe_timer.Start();
-          if (probe_batch_->num_rows() == 0) {
-            // Empty batches can still contain IO buffers, which need to be passed up to
-            // the caller; transferring resources can fill up out_batch.
-            probe_batch_->TransferResourceOwnership(out_batch);
-            if (probe_side_eos_) {
-              eos_ = true;
-              break;
-            }
-            if (out_batch->AtCapacity()) return Status::OK();
-            continue;
-          } else {
-            COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
-            break;
-          }
-        }
-      } else {
-        eos_ = true;
-      }
-      // finish up right outer join
-      if (eos_ && match_all_build_) {
-        hash_tbl_iterator_ = hash_tbl_->Begin();
-      }
-    }
-
-    if (eos_) break;
-
-    // join remaining rows in probe batch_
-    current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
-    VLOG_ROW << "probe row: " << GetLeftChildRowString(current_probe_row_);
-    matched_probe_ = false;
-    hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
-  }
-
-  *eos = true;
-  if (match_all_build_) {
-    // output remaining unmatched build rows
-    TupleRow* build_row = NULL;
-    while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
-      build_row = hash_tbl_iterator_.GetRow();
-      bool matched = hash_tbl_iterator_.matched();
-      hash_tbl_iterator_.Next<false>();
-      if (matched) continue;
-
-      int row_idx = out_batch->AddRow();
-      TupleRow* out_row = out_batch->GetRow(row_idx);
-      CreateOutputRow(out_row, NULL, build_row);
-      if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
-        out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
-        ++num_rows_returned_;
-        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-        if (ReachedLimit()) {
-          *eos = true;
-          return Status::OK();
-        }
-      }
-    }
-    // we're done if there are no more rows left to check
-    *eos = hash_tbl_iterator_.AtEnd();
-  }
-  return Status::OK();
-}
-
-Status HashJoinNode::LeftJoinGetNext(RuntimeState* state,
-    RowBatch* out_batch, bool* eos) {
-  *eos = eos_;
-
-  ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_);
-  while (!eos_) {
-    // Compute max rows that should be added to out_batch
-    int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows();
-    if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
-
-    // Continue processing this row batch
-    if (process_probe_batch_fn_ == NULL) {
-      num_rows_returned_ +=
-          ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows);
-    } else {
-      // Use codegen'd function
-      num_rows_returned_ +=
-          process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows);
-    }
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
-    if (ReachedLimit() || out_batch->AtCapacity()) {
-      *eos = ReachedLimit();
-      break;
-    }
-
-    // Check to see if we're done processing the current probe batch
-    if (hash_tbl_iterator_.AtEnd() && probe_batch_pos_ == probe_batch_->num_rows()) {
-      probe_batch_->TransferResourceOwnership(out_batch);
-      probe_batch_pos_ = 0;
-      if (out_batch->AtCapacity()) break;
-      if (probe_side_eos_) {
-        *eos = eos_ = true;
-        break;
-      } else {
-        probe_timer.Stop();
-        RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
-        probe_timer.Start();
-        COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
-      }
-    }
-  }
-
-  return Status::OK();
-}
-
-void HashJoinNode::AddToDebugString(int indentation_level, stringstream* out) const {
-  *out << " hash_tbl=";
-  *out << string(indentation_level * 2, ' ');
-  *out << "HashTbl("
-       << " build_exprs=" << ScalarExpr::DebugString(build_exprs_)
-       << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_);
-  *out << ")";
-}
-
-// This codegen'd function should only be used for left join cases so it assumes that
-// the probe row is non-null.  For a left outer join, the IR looks like:
-// define void @CreateOutputRow(%"class.impala::BlockingJoinNode"* %this_ptr,
-//                              %"class.impala::TupleRow"* %out_arg,
-//                              %"class.impala::TupleRow"* %probe_arg,
-//                              %"class.impala::TupleRow"* %build_arg) {
-// entry:
-//   %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
-//   %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
-//   %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
-//   %0 = bitcast i8** %out to i8*
-//   %1 = bitcast i8** %probe to i8*
-//   call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false)
-//   %is_build_null = icmp eq i8** %build, null
-//   br i1 %is_build_null, label %build_null, label %build_not_null
-//
-// build_not_null:                                   ; preds = %entry
-//   %dst_tuple_ptr1 = getelementptr i8** %out, i32 1
-//   %src_tuple_ptr = getelementptr i8** %build, i32 0
-//   %2 = load i8** %src_tuple_ptr
-//   store i8* %2, i8** %dst_tuple_ptr1
-//   ret void
-//
-// build_null:                                       ; preds = %entry
-//   %dst_tuple_ptr = getelementptr i8** %out, i32 1
-//   call void @llvm.memcpy.p0i8.p0i8.i32(
-//      i8* %dst_tuple_ptr, i8* %1, i32 16, i32 16, i1 false)
-//   ret void
-// }
-Function* HashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen) {
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
-  Type* this_type = codegen->GetType(BlockingJoinNode::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
-  // TupleRows are really just an array of pointers.  Easier to work with them
-  // this way.
-  PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
-
-  // Construct function signature to match CreateOutputRow()
-  LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
-
-  LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-  Value* args[4];
-  Function* fn = prototype.GeneratePrototype(&builder, args);
-  Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
-  Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
-  Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
-
-  int num_probe_tuples = child(0)->row_desc()->tuple_descriptors().size();
-  int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
-
-  // Copy probe row
-  codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, probe_tuple_row_size_);
-  Value* build_row_idx[] = {codegen->GetIntConstant(TYPE_INT, num_probe_tuples)};
-  Value* build_row_dst =
-      builder.CreateInBoundsGEP(out_row_arg, build_row_idx, "build_dst_ptr");
-
-  // Copy build row.
-  BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
-  BasicBlock* build_null_block = NULL;
-
-  if (match_all_probe_) {
-    // build tuple can be null
-    build_null_block = BasicBlock::Create(context, "build_null", fn);
-    Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
-    builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
-
-    // Set tuple build ptrs to NULL
-    // TODO: this should be replaced with memset() but I can't get the llvm intrinsic
-    // to work.
-    builder.SetInsertPoint(build_null_block);
-    for (int i = 0; i < num_build_tuples; ++i) {
-      Value* array_idx[] = {codegen->GetIntConstant(TYPE_INT, i + num_probe_tuples)};
-      Value* dst = builder.CreateInBoundsGEP(out_row_arg, array_idx, "dst_tuple_ptr");
-      builder.CreateStore(codegen->null_ptr_value(), dst);
-    }
-    builder.CreateRetVoid();
-  } else {
-    // build row can't be NULL
-    builder.CreateBr(build_not_null_block);
-  }
-
-  // Copy build tuple ptrs
-  builder.SetInsertPoint(build_not_null_block);
-  codegen->CodegenMemcpy(&builder, build_row_dst, build_row_arg, build_tuple_row_size_);
-  builder.CreateRetVoid();
-
-  return codegen->FinalizeFunction(fn);
-}
-
-Function* HashJoinNode::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
-    Function* hash_fn) {
-  // Get cross compiled function
-  Function* process_build_batch_fn =
-      codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH, true);
-  DCHECK(process_build_batch_fn != NULL);
-
-  // Codegen for evaluating build rows
-  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
-  if (eval_row_fn == NULL) return NULL;
-
-  int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
-      "EvalBuildRow");
-  DCHECK_EQ(replaced, 1);
-
-  replaced = codegen->ReplaceCallSites(process_build_batch_fn, hash_fn, "HashCurrentRow");
-  DCHECK_EQ(replaced, 1);
-
-  return codegen->FinalizeFunction(process_build_batch_fn);
-}
-
-Function* HashJoinNode::CodegenProcessProbeBatch(LlvmCodeGen* codegen,
-    Function* hash_fn) {
-  // Get cross compiled function
-  Function* process_probe_batch_fn =
-      codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH, true);
-  DCHECK(process_probe_batch_fn != NULL);
-
-  // Codegen HashTable::Equals()
-  Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
-  if (equals_fn == NULL) return NULL;
-
-  // Codegen for evaluating build rows
-  Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
-  if (eval_row_fn == NULL) return NULL;
-
-  // Codegen CreateOutputRow()
-  Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
-  if (create_output_row_fn == NULL) return NULL;
-
-  // Codegen evaluating other join conjuncts
-  Function* eval_other_conjuncts_fn;
-  Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_,
-      &eval_other_conjuncts_fn, "EvalOtherConjuncts");
-  if (!status.ok()) return NULL;
-
-  // Codegen evaluating conjuncts
-  Function* eval_conjuncts_fn;
-  status = ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn);
-  if (!status.ok()) return NULL;
-
-  // Replace all call sites with codegen version
-  int replaced = codegen->ReplaceCallSites(process_probe_batch_fn, hash_fn,
-      "HashCurrentRow");
-  DCHECK_EQ(replaced, 1);
-
-  replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_row_fn,
-      "EvalProbeRow");
-  DCHECK_EQ(replaced, 1);
-
-  replaced = codegen->ReplaceCallSites(process_probe_batch_fn, create_output_row_fn,
-      "CreateOutputRow");
-  DCHECK_EQ(replaced, 3);
-
-  replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_conjuncts_fn,
-      "EvalConjuncts");
-  DCHECK_EQ(replaced, 2);
-
-  replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_other_conjuncts_fn,
-      "EvalOtherJoinConjuncts");
-  DCHECK_EQ(replaced, 2);
-
-  replaced = codegen->ReplaceCallSites(process_probe_batch_fn, equals_fn, "Equals");
-  DCHECK_EQ(replaced, 2);
-
-  return codegen->FinalizeFunction(process_probe_batch_fn);
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h
deleted file mode 100644
index b49f8bb..0000000
--- a/be/src/exec/hash-join-node.h
+++ /dev/null
@@ -1,164 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_HASH_JOIN_NODE_H
-#define IMPALA_EXEC_HASH_JOIN_NODE_H
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread.hpp>
-#include <string>
-
-#include "exec/exec-node.h"
-#include "exec/old-hash-table.h"
-#include "exec/blocking-join-node.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "util/promise.h"
-
-#include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
-
-namespace impala {
-
-class MemPool;
-class RowBatch;
-class ScalarExpr;
-class ScalarExprEvaluator;
-class TupleRow;
-
-/// Node for in-memory hash joins:
-/// - builds up a hash table with the rows produced by our right input
-///   (child(1)); build exprs are the rhs exprs of our equi-join predicates
-/// - for each row from our left input, probes the hash table to retrieve
-///   matching entries; the probe exprs are the lhs exprs of our equi-join predicates
-//
-/// Row batches:
-/// - In general, we are not able to pass our output row batch on to our left child (when
-///   we're fetching the probe rows): if we have a 1xn join, our output will contain
-///   multiple rows per left input row
-/// - TODO: fix this, so in the case of 1x1/nx1 joins (for instance, fact to dimension tbl)
-///   we don't do these extra copies
-class HashJoinNode : public BlockingJoinNode {
- public:
-  HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
-  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
-  virtual Status Prepare(RuntimeState* state);
-  virtual void Codegen(RuntimeState* state);
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
-  virtual void Close(RuntimeState* state);
-
-  static const char* LLVM_CLASS_NAME;
-
- protected:
-  virtual Status QueryMaintenance(RuntimeState* state);
-  virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
-  virtual Status ProcessBuildInput(RuntimeState* state);
-
- private:
-  boost::scoped_ptr<OldHashTable> hash_tbl_;
-  OldHashTable::Iterator hash_tbl_iterator_;
-
-  /// holds everything referenced from build side
-  boost::scoped_ptr<MemPool> build_pool_;
-
-  /// our equi-join predicates "<lhs> = <rhs>" are separated into
-  /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0))
-  std::vector<ScalarExpr*> probe_exprs_;
-  std::vector<ScalarExpr*> build_exprs_;
-
-  /// Expressions used to build runtime filters, one per entry in filters_.
-  std::vector<ScalarExpr*> filter_exprs_;
-
-  /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
-  /// NOT DISTINCT FROM, rather than equality.
-  std::vector<bool> is_not_distinct_from_;
-
-  /// non-equi-join conjuncts from the JOIN clause
-  std::vector<ScalarExpr*> other_join_conjuncts_;
-  std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_;
-
-  /// Derived from join_op_
-  /// Output all rows coming from the probe input. Used in LEFT_OUTER_JOIN and
-  /// FULL_OUTER_JOIN.
-  bool match_all_probe_;
-
-  /// Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
-  bool match_one_build_;
-
-  /// Output all rows coming from the build input. Used in RIGHT_OUTER_JOIN and
-  /// FULL_OUTER_JOIN.
-  bool match_all_build_;
-
-  /// llvm function for build batch
-  llvm::Function* codegen_process_build_batch_fn_;
-
-  /// Function declaration for codegen'd function.  Signature must match
-  /// HashJoinNode::ProcessBuildBatch
-  typedef void (*ProcessBuildBatchFn)(HashJoinNode*, RowBatch*);
-  ProcessBuildBatchFn process_build_batch_fn_;
-
-  /// HashJoinNode::ProcessProbeBatch() exactly
-  typedef int (*ProcessProbeBatchFn)(HashJoinNode*, RowBatch*, RowBatch*, int);
-  /// Jitted ProcessProbeBatch function pointer.  Null if codegen is disabled.
-  ProcessProbeBatchFn process_probe_batch_fn_;
-
-  /// RuntimeFilters to build.
-  std::vector<RuntimeFilter*> filters_;
-
-  RuntimeProfile::Counter* build_buckets_counter_;   // num buckets in hash table
-  RuntimeProfile::Counter* hash_tbl_load_factor_counter_;
-
-  /// Prepares for the first call to GetNext(). Must be called after GetFirstProbeRow().
-  void InitGetNext();
-
-  /// GetNext helper function for the common join cases: Inner join, left semi and left
-  /// outer
-  Status LeftJoinGetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
-  /// Processes a probe batch for the common (non right-outer join) cases.
-  ///  out_batch: the batch for resulting tuple rows
-  ///  probe_batch: the probe batch to process.  This function can be called to
-  ///    continue processing a batch in the middle
-  ///  max_added_rows: maximum rows that can be added to out_batch
-  /// return the number of rows added to out_batch
-  int ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, int max_added_rows);
-
-  /// Construct the build hash table, adding all the rows in 'build_batch'
-  void ProcessBuildBatch(RowBatch* build_batch);
-
-  /// Codegen function to create output row
-  llvm::Function* CodegenCreateOutputRow(LlvmCodeGen* codegen);
-
-  /// Codegen processing build batches.  Identical signature to ProcessBuildBatch.
-  /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
-  /// hash table.
-  /// Returns NULL if codegen was not possible.
-  llvm::Function* CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
-
-  /// Codegen processing probe batches.  Identical signature to ProcessProbeBatch.
-  /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
-  /// hash table.
-  /// Returns NULL if codegen was not possible.
-  llvm::Function* CodegenProcessProbeBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 63298cb..e97572c 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -54,8 +54,6 @@ using namespace llvm;
 
 DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
     "rows rejected by a runtime filter drops below this value, the filter is disabled.");
-DECLARE_bool(enable_partitioned_aggregation);
-DECLARE_bool(enable_partitioned_hash_join);
 
 // The number of row batches between checks to see if a filter is effective, and
 // should be disabled. Must be a power of two.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 01a7f19..d377a22 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -186,11 +186,6 @@ Status NestedLoopJoinNode::ResetMatchingBuildRows(RuntimeState* state, int64_t n
   return Status::OK();
 }
 
-Status NestedLoopJoinNode::ProcessBuildInput(RuntimeState* state) {
-  DCHECK(false) << "Should not be called, NLJ uses the BuildSink API";
-  return Status::OK();
-}
-
 void NestedLoopJoinNode::ResetForProbe() {
   DCHECK(build_batches_ != NULL);
   build_row_iterator_ = build_batches_->Iterator();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 94f1dae..c94abbf 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -51,9 +51,6 @@ class NestedLoopJoinNode : public BlockingJoinNode {
   virtual Status Reset(RuntimeState* state);
   virtual void Close(RuntimeState* state);
 
- protected:
-  virtual Status ProcessBuildInput(RuntimeState* state);
-
  private:
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-ir.cc b/be/src/exec/old-hash-table-ir.cc
deleted file mode 100644
index 2436ef1..0000000
--- a/be/src/exec/old-hash-table-ir.cc
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifdef IR_COMPILE
-
-#include "exec/old-hash-table.h"
-
-namespace impala {
-
-uint8_t* OldHashTable::expr_values_buffer() const {
-  return expr_values_buffer_;
-}
-
-uint8_t* OldHashTable::expr_value_null_bits() const {
-  return expr_value_null_bits_;
-}
-
-ScalarExprEvaluator* const* OldHashTable::build_expr_evals() const {
-  return build_expr_evals_.data();
-}
-
-ScalarExprEvaluator* const* OldHashTable::probe_expr_evals() const {
-  return probe_expr_evals_.data();
-}
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-test.cc b/be/src/exec/old-hash-table-test.cc
deleted file mode 100644
index e873791..0000000
--- a/be/src/exec/old-hash-table-test.cc
+++ /dev/null
@@ -1,337 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <iostream>
-#include <vector>
-
-#include "common/compiler-util.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/string-value.h"
-#include "runtime/tuple-row.h"
-#include "testutil/gtest-util.h"
-#include "util/cpu-info.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-class OldHashTableTest : public testing::Test {
- public:
-  OldHashTableTest() : mem_pool_(&tracker_) {}
-
- protected:
-  ObjectPool pool_;
-  MemTracker tracker_;
-  MemPool mem_pool_;
-
-  vector<ScalarExpr*> build_exprs_;
-  vector<ScalarExprEvaluator*> build_expr_evals_;
-  vector<ScalarExpr*> probe_exprs_;
-  vector<ScalarExprEvaluator*> probe_expr_evals_;
-
-  virtual void SetUp() {
-    RowDescriptor desc;
-    // Not very easy to test complex tuple layouts so this test will use the
-    // simplest.  The purpose of these tests is to exercise the hash map
-    // internals so a simple build/probe expr is fine.
-    ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
-    ASSERT_OK(build_expr->Init(desc, nullptr));
-    build_exprs_.push_back(build_expr);
-    ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_,
-        &build_expr_evals_));
-    ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
-
-    ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
-    ASSERT_OK(probe_expr->Init(desc, nullptr));
-    probe_exprs_.push_back(probe_expr);
-    ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
-        &probe_expr_evals_));
-    ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
-  }
-
-  virtual void TearDown() {
-    ScalarExprEvaluator::Close(build_expr_evals_, nullptr);
-    ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
-    ScalarExpr::Close(build_exprs_);
-    ScalarExpr::Close(probe_exprs_);
-  }
-
-  TupleRow* CreateTupleRow(int32_t val) {
-    uint8_t* tuple_row_mem = mem_pool_.Allocate(sizeof(int32_t*));
-    Tuple* tuple_mem = Tuple::Create(sizeof(int32_t), &mem_pool_);
-    *reinterpret_cast<int32_t*>(tuple_mem) = val;
-    TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
-    row->SetTuple(0, tuple_mem);
-    return row;
-  }
-
-  // Wrapper to call private methods on OldHashTable
-  // TODO: understand google testing, there must be a more natural way to do this
-  void ResizeTable(OldHashTable* table, int64_t new_size) {
-    table->ResizeBuckets(new_size);
-  }
-
-  // Do a full table scan on table.  All values should be between [min,max).  If
-  // all_unique, then each key(int value) should only appear once.  Results are
-  // stored in results, indexed by the key.  Results must have been preallocated to
-  // be at least max size.
-  void FullScan(OldHashTable* table, int min, int max, bool all_unique,
-      TupleRow** results, TupleRow** expected) {
-    OldHashTable::Iterator iter = table->Begin();
-    while (iter != table->End()) {
-      TupleRow* row = iter.GetRow();
-      int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row));
-      EXPECT_GE(val, min);
-      EXPECT_LT(val, max);
-      if (all_unique) EXPECT_TRUE(results[val] == nullptr);
-      EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0));
-      results[val] = row;
-      iter.Next<false>();
-    }
-  }
-
-  // Validate that probe_row evaluates overs probe_exprs is equal to build_row
-  // evaluated over build_exprs
-  void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) {
-    EXPECT_TRUE(probe_row != build_row);
-    int32_t build_val =
-        *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row));
-    int32_t probe_val =
-        *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row));
-    EXPECT_EQ(build_val, probe_val);
-  }
-
-  struct ProbeTestData {
-    TupleRow* probe_row;
-    vector<TupleRow*> expected_build_rows;
-  };
-
-  void ProbeTest(OldHashTable* table, ProbeTestData* data, int num_data, bool scan) {
-    for (int i = 0; i < num_data; ++i) {
-      TupleRow* row = data[i].probe_row;
-
-      OldHashTable::Iterator iter;
-      iter = table->Find(row);
-
-      if (data[i].expected_build_rows.size() == 0) {
-        EXPECT_TRUE(iter == table->End());
-      } else {
-        if (scan) {
-          map<TupleRow*, bool> matched;
-          while (iter != table->End()) {
-            EXPECT_TRUE(matched.find(iter.GetRow()) == matched.end());
-            matched[iter.GetRow()] = true;
-            iter.Next<true>();
-          }
-          EXPECT_EQ(matched.size(), data[i].expected_build_rows.size());
-          for (int j = 0; i < data[j].expected_build_rows.size(); ++j) {
-            EXPECT_TRUE(matched[data[i].expected_build_rows[j]]);
-          }
-        } else {
-          EXPECT_EQ(data[i].expected_build_rows.size(), 1);
-          EXPECT_TRUE(
-              data[i].expected_build_rows[0]->GetTuple(0) == iter.GetRow()->GetTuple(0));
-          ValidateMatch(row, iter.GetRow());
-        }
-      }
-    }
-  }
-};
-
-TEST_F(OldHashTableTest, SetupTest) {
-  TupleRow* build_row1 = CreateTupleRow(1);
-  TupleRow* build_row2 = CreateTupleRow(2);
-  TupleRow* probe_row3 = CreateTupleRow(3);
-  TupleRow* probe_row4 = CreateTupleRow(4);
-
-  int32_t* val_row1 =
-      reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1));
-  EXPECT_EQ(*val_row1, 1);
-  int32_t* val_row2 =
-      reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2));
-  EXPECT_EQ(*val_row2, 2);
-  int32_t* val_row3 =
-      reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3));
-  EXPECT_EQ(*val_row3, 3);
-  int32_t* val_row4 =
-      reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4));
-  EXPECT_EQ(*val_row4, 4);
-
-  mem_pool_.FreeAll();
-}
-
-// This tests inserts the build rows [0->5) to hash table.  It validates that they
-// are all there using a full table scan.  It also validates that Find() is correct
-// testing for probe rows that are both there and not.
-// The hash table is rehashed a few times and the scans/finds are tested again.
-TEST_F(OldHashTableTest, BasicTest) {
-  TupleRow* build_rows[5];
-  TupleRow* scan_rows[5] = {0};
-  for (int i = 0; i < 5; ++i) {
-    build_rows[i] = CreateTupleRow(i);
-  }
-
-  ProbeTestData probe_rows[10];
-  for (int i = 0; i < 10; ++i) {
-    probe_rows[i].probe_row = CreateTupleRow(i);
-    if (i < 5) {
-      probe_rows[i].expected_build_rows.push_back(build_rows[i]);
-    }
-  }
-
-  // Create the hash table and insert the build rows
-  MemTracker tracker;
-  scoped_ptr<OldHashTable> hash_table;
-  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
-      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
-      0, &tracker, vector<RuntimeFilter*>(), &hash_table));
-  EXPECT_OK(hash_table->Open(nullptr));
-  for (int i = 0; i < 5; ++i) {
-    hash_table->Insert(build_rows[i]);
-  }
-  EXPECT_EQ(hash_table->size(), 5);
-
-  // Do a full table scan and validate returned pointers
-  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
-  ProbeTest(hash_table.get(), probe_rows, 10, false);
-
-  // Resize and scan again
-  ResizeTable(hash_table.get(), 64);
-  EXPECT_EQ(hash_table->num_buckets(), 64);
-  EXPECT_EQ(hash_table->size(), 5);
-  memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
-  ProbeTest(hash_table.get(), probe_rows, 10, false);
-
-  // Resize to two and cause some collisions
-  ResizeTable(hash_table.get(), 2);
-  EXPECT_EQ(hash_table->num_buckets(), 2);
-  EXPECT_EQ(hash_table->size(), 5);
-  memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
-  ProbeTest(hash_table.get(), probe_rows, 10, false);
-
-  // Resize to one and turn it into a linked list
-  ResizeTable(hash_table.get(), 1);
-  EXPECT_EQ(hash_table->num_buckets(), 1);
-  EXPECT_EQ(hash_table->size(), 5);
-  memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
-  ProbeTest(hash_table.get(), probe_rows, 10, false);
-
-  hash_table->Close(nullptr);
-  mem_pool_.FreeAll();
-}
-
-// This tests makes sure we can scan ranges of buckets
-TEST_F(OldHashTableTest, ScanTest) {
-  MemTracker tracker;
-  scoped_ptr<OldHashTable> hash_table;
-  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
-      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
-      0, &tracker, vector<RuntimeFilter*>(), &hash_table));
-  EXPECT_OK(hash_table->Open(nullptr));
-  // Add 1 row with val 1, 2 with val 2, etc
-  vector<TupleRow*> build_rows;
-  ProbeTestData probe_rows[15];
-  probe_rows[0].probe_row = CreateTupleRow(0);
-  for (int val = 1; val <= 10; ++val) {
-    probe_rows[val].probe_row = CreateTupleRow(val);
-    for (int i = 0; i < val; ++i) {
-      TupleRow* row = CreateTupleRow(val);
-      hash_table->Insert(row);
-      build_rows.push_back(row);
-      probe_rows[val].expected_build_rows.push_back(row);
-    }
-  }
-
-  // Add some more probe rows that aren't there
-  for (int val = 11; val < 15; ++val) {
-    probe_rows[val].probe_row = CreateTupleRow(val);
-  }
-
-  // Test that all the builds were found
-  ProbeTest(hash_table.get(), probe_rows, 15, true);
-
-  // Resize and try again
-  ResizeTable(hash_table.get(), 128);
-  EXPECT_EQ(hash_table->num_buckets(), 128);
-  ProbeTest(hash_table.get(), probe_rows, 15, true);
-
-  ResizeTable(hash_table.get(), 16);
-  EXPECT_EQ(hash_table->num_buckets(), 16);
-  ProbeTest(hash_table.get(), probe_rows, 15, true);
-
-  ResizeTable(hash_table.get(), 2);
-  EXPECT_EQ(hash_table->num_buckets(), 2);
-  ProbeTest(hash_table.get(), probe_rows, 15, true);
-
-  hash_table->Close(nullptr);
-  mem_pool_.FreeAll();
-}
-
-// This test continues adding to the hash table to trigger the resize code paths
-TEST_F(OldHashTableTest, GrowTableTest) {
-  int num_to_add = 4;
-  int expected_size = 0;
-  MemTracker tracker(100 * 1024 * 1024);
-  scoped_ptr<OldHashTable> hash_table;
-  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
-      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
-      0, &tracker, vector<RuntimeFilter*>(), &hash_table, false, num_to_add));
-  EXPECT_OK(hash_table->Open(nullptr));
-  EXPECT_FALSE(hash_table->mem_limit_exceeded());
-  EXPECT_TRUE(!tracker.LimitExceeded());
-
-  // This inserts about 5M entries
-  int build_row_val = 0;
-  for (int i = 0; i < 20; ++i) {
-    for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
-      hash_table->Insert(CreateTupleRow(build_row_val));
-    }
-    expected_size += num_to_add;
-    num_to_add *= 2;
-  }
-  EXPECT_TRUE(hash_table->mem_limit_exceeded());
-  EXPECT_TRUE(tracker.LimitExceeded());
-
-  // Validate that we can find the entries before we went over the limit
-  for (int i = 0; i < expected_size * 5; i += 100000) {
-    TupleRow* probe_row = CreateTupleRow(i);
-    OldHashTable::Iterator iter = hash_table->Find(probe_row);
-    if (i < hash_table->size()) {
-      EXPECT_TRUE(iter != hash_table->End());
-      ValidateMatch(probe_row, iter.GetRow());
-    } else {
-      EXPECT_TRUE(iter == hash_table->End());
-    }
-  }
-  hash_table->Close(nullptr);
-  mem_pool_.FreeAll();
-}
-
-}
-
-IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc
deleted file mode 100644
index 9105226..0000000
--- a/be/src/exec/old-hash-table.cc
+++ /dev/null
@@ -1,872 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/old-hash-table.inline.h"
-
-#include <functional>
-#include <numeric>
-
-#include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.inline.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/tuple-row.h"
-#include "util/bloom-filter.h"
-#include "runtime/tuple.h"
-#include "util/debug-util.h"
-#include "util/error-util.h"
-#include "util/impalad-metrics.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace llvm;
-
-const char* OldHashTable::LLVM_CLASS_NAME = "class.impala::OldHashTable";
-
-const float OldHashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f;
-static const int HT_PAGE_SIZE = 8 * 1024 * 1024;
-
-// Put a non-zero constant in the result location for NULL.
-// We don't want(NULL, 1) to hash to the same as (0, 1).
-// This needs to be as big as the biggest primitive type since the bytes
-// get copied directly.
-// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes
-static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
-                                HashUtil::FNV_SEED, HashUtil::FNV_SEED };
-
-OldHashTable::OldHashTable(RuntimeState* state,
-    const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs,
-    const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls,
-    const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker,
-    const vector<RuntimeFilter*>& runtime_filters, bool stores_tuples,
-    int64_t num_buckets)
-  : state_(state),
-    build_exprs_(build_exprs),
-    probe_exprs_(probe_exprs),
-    filter_exprs_(filter_exprs),
-    filters_(runtime_filters),
-    num_build_tuples_(num_build_tuples),
-    stores_nulls_(stores_nulls),
-    finds_nulls_(finds_nulls),
-    finds_some_nulls_(std::accumulate(
-        finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())),
-    stores_tuples_(stores_tuples),
-    initial_seed_(initial_seed),
-    num_filled_buckets_(0),
-    num_nodes_(0),
-    mem_pool_(new MemPool(mem_tracker)),
-    num_data_pages_(0),
-    next_node_(NULL),
-    node_remaining_current_page_(0),
-    mem_tracker_(mem_tracker),
-    mem_limit_exceeded_(false) {
-  DCHECK(mem_tracker != NULL);
-  DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
-  DCHECK_EQ(build_exprs_.size(), finds_nulls_.size());
-  DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
-  buckets_.resize(num_buckets);
-  num_buckets_ = num_buckets;
-  num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_;
-  mem_tracker_->Consume(buckets_.capacity() * sizeof(Bucket));
-
-  // Compute the layout and buffer size to store the evaluated expr results
-  results_buffer_size_ = ScalarExpr::ComputeResultsLayout(build_exprs_,
-      &expr_values_buffer_offsets_, &var_result_begin_);
-  expr_values_buffer_= new uint8_t[results_buffer_size_];
-  memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_);
-  expr_value_null_bits_ = new uint8_t[build_exprs_.size()];
-
-  GrowNodeArray();
-}
-
-Status OldHashTable::Init(ObjectPool* pool, RuntimeState* state) {
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool,
-      mem_pool_.get(), &build_expr_evals_));
-  DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool,
-      mem_pool_.get(), &probe_expr_evals_));
-  DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(filter_exprs_, state, pool,
-      mem_pool_.get(), &filter_expr_evals_));
-  DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size());
-  return Status::OK();
-}
-
-Status OldHashTable::Create(ObjectPool* pool, RuntimeState* state,
-    const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs,
-    const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls,
-    const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker,
-    const vector<RuntimeFilter*>& runtime_filters, scoped_ptr<OldHashTable>* hash_tbl,
-    bool stores_tuples, int64_t num_buckets) {
-  hash_tbl->reset(new OldHashTable(state, build_exprs, probe_exprs, filter_exprs,
-      num_build_tuples, stores_nulls, finds_nulls, initial_seed, mem_tracker,
-      runtime_filters, stores_tuples, num_buckets));
-  return (*hash_tbl)->Init(pool, state);
-}
-
-Status OldHashTable::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state));
-  DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state));
-  DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Open(filter_expr_evals_, state));
-  DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size());
-  return Status::OK();
-}
-
-void OldHashTable::Close(RuntimeState* state) {
-  // TODO: use tr1::array?
-  delete[] expr_values_buffer_;
-  delete[] expr_value_null_bits_;
-  expr_values_buffer_ = NULL;
-  expr_value_null_bits_ = NULL;
-  ScalarExprEvaluator::Close(build_expr_evals_, state);
-  ScalarExprEvaluator::Close(probe_expr_evals_, state);
-  ScalarExprEvaluator::Close(filter_expr_evals_, state);
-  mem_pool_->FreeAll();
-  if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
-    ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-num_data_pages_ * HT_PAGE_SIZE);
-  }
-  mem_tracker_->Release(buckets_.capacity() * sizeof(Bucket));
-  buckets_.clear();
-}
-
-void OldHashTable::FreeLocalAllocations() {
-  ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_);
-  ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_);
-  ScalarExprEvaluator::FreeLocalAllocations(filter_expr_evals_);
-}
-
-bool OldHashTable::EvalRow(
-    TupleRow* row, const vector<ScalarExprEvaluator*>& evals) {
-  bool has_null = false;
-  for (int i = 0; i < evals.size(); ++i) {
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    void* val = evals[i]->GetValue(row);
-    if (val == NULL) {
-      // If the table doesn't store nulls, no reason to keep evaluating
-      if (!stores_nulls_) return true;
-
-      expr_value_null_bits_[i] = true;
-      val = &NULL_VALUE;
-      has_null = true;
-    } else {
-      expr_value_null_bits_[i] = false;
-    }
-    RawValue::Write(val, loc, build_exprs_[i]->type(), NULL);
-  }
-  return has_null;
-}
-
-int OldHashTable::AddBloomFilters() {
-  int num_enabled_filters = 0;
-  vector<BloomFilter*> bloom_filters;
-  bloom_filters.resize(filters_.size());
-  for (int i = 0; i < filters_.size(); ++i) {
-    if (state_->filter_bank()->FpRateTooHigh(filters_[i]->filter_size(), size())) {
-      bloom_filters[i] = BloomFilter::ALWAYS_TRUE_FILTER;
-    } else {
-      bloom_filters[i] =
-          state_->filter_bank()->AllocateScratchBloomFilter(filters_[i]->id());
-      ++num_enabled_filters;
-    }
-  }
-
-  OldHashTable::Iterator iter = Begin();
-  while (iter != End()) {
-    TupleRow* row = iter.GetRow();
-    for (int i = 0; i < filters_.size(); ++i) {
-      if (bloom_filters[i] == NULL) continue;
-      void* e = filter_expr_evals_[i]->GetValue(row);
-      uint32_t h = RawValue::GetHashValue(e, filter_exprs_[i]->type(),
-          RuntimeFilterBank::DefaultHashSeed());
-      bloom_filters[i]->Insert(h);
-    }
-    iter.Next<false>();
-  }
-
-  // Update all the local filters in the filter bank.
-  for (int i = 0; i < filters_.size(); ++i) {
-    state_->filter_bank()->UpdateFilterFromLocal(filters_[i]->id(), bloom_filters[i]);
-  }
-
-  return num_enabled_filters;
-}
-
-// Helper function to store a value into the results buffer if the expr
-// evaluated to NULL.  We don't want (NULL, 1) to hash to the same as (0,1) so
-// we'll pick a more random value.
-static void CodegenAssignNullValue(
-    LlvmCodeGen* codegen, LlvmBuilder* builder, Value* dst, const ColumnType& type) {
-  uint64_t fnv_seed = HashUtil::FNV_SEED;
-
-  if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) {
-    Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr");
-    Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len");
-    Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed);
-    Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type());
-    builder->CreateStore(null_ptr, dst_ptr);
-    builder->CreateStore(null_len, dst_len);
-    return;
-  } else {
-    Value* null_value = NULL;
-    int byte_size = type.GetByteSize();
-    // Get a type specific representation of fnv_seed
-    switch (type.type) {
-      case TYPE_BOOLEAN:
-        // In results, booleans are stored as 1 byte
-        dst = builder->CreateBitCast(dst, codegen->ptr_type());
-        null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed);
-        break;
-      case TYPE_TIMESTAMP: {
-        // Cast 'dst' to 'i128*'
-        DCHECK_EQ(byte_size, 16);
-        PointerType* fnv_seed_ptr_type =
-            codegen->GetPtrType(Type::getIntNTy(codegen->context(), byte_size * 8));
-        dst = builder->CreateBitCast(dst, fnv_seed_ptr_type);
-        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
-        break;
-      }
-      case TYPE_TINYINT:
-      case TYPE_SMALLINT:
-      case TYPE_INT:
-      case TYPE_BIGINT:
-      case TYPE_DECIMAL:
-        null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
-        break;
-      case TYPE_FLOAT: {
-        // Don't care about the value, just the bit pattern
-        float fnv_seed_float = *reinterpret_cast<float*>(&fnv_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_float));
-        break;
-      }
-      case TYPE_DOUBLE: {
-        // Don't care about the value, just the bit pattern
-        double fnv_seed_double = *reinterpret_cast<double*>(&fnv_seed);
-        null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_double));
-        break;
-      }
-      default:
-        DCHECK(false);
-    }
-    builder->CreateStore(null_value, dst);
-  }
-}
-
-// Codegen for evaluating a tuple row over either build_exprs_ or probe_exprs_.
-// For the case where we are joining on a single int, the IR looks like
-// define i1 @EvaBuildRow(%"class.impala::OldHashTable"* %this_ptr,
-//                        %"class.impala::TupleRow"* %row) {
-// entry:
-//   %null_ptr = alloca i1
-//   %0 = bitcast %"class.impala::TupleRow"* %row to i8**
-//   %eval = call i32 @SlotRef(i8** %0, i8* null, i1* %null_ptr)
-//   %1 = load i1* %null_ptr
-//   br i1 %1, label %null, label %not_null
-//
-// null:                                             ; preds = %entry
-//   ret i1 true
-//
-// not_null:                                         ; preds = %entry
-//   store i32 %eval, i32* inttoptr (i64 46146336 to i32*)
-//   br label %continue
-//
-// continue:                                         ; preds = %not_null
-//   %2 = zext i1 %1 to i8
-//   store i8 %2, i8* inttoptr (i64 46146248 to i8*)
-//   ret i1 false
-// }
-// For each expr, we create 3 code blocks.  The null, not null and continue blocks.
-// Both the null and not null branch into the continue block.  The continue block
-// becomes the start of the next block for codegen (either the next expr or just the
-// end of the function).
-Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) {
-  DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
-  const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_;
-  for (int i = 0; i < exprs.size(); ++i) {
-    PrimitiveType type = exprs[i]->type().type;
-    if (type == TYPE_CHAR) return NULL;
-  }
-
-  // Get types to generate function prototype
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
-  Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
-  LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
-      codegen->GetType(TYPE_BOOLEAN));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-  Value* args[2];
-  Function* fn = prototype.GeneratePrototype(&builder, args);
-  Value* this_ptr = args[0];
-  Value* row = args[1];
-  Value* has_null = codegen->false_value();
-
-  IRFunction::Type fn_name = build ?
-      IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
-      IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS;
-  Function* get_expr_eval_fn = codegen->GetFunction(fn_name, false);
-  DCHECK(get_expr_eval_fn != NULL);
-
-  // Aggregation with no grouping exprs also use the hash table interface for
-  // code simplicity. In that case, there are no build exprs.
-  if (!exprs.empty()) {
-    // Load build_expr_evals_.data() / probe_expr_evals_.data()
-    Value* eval_vector = codegen->CodegenCallFunction(&builder, build ?
-        IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
-        IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS,
-        this_ptr, "eval_vector");
-
-    // Load expr_values_buffer_
-    Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER,
-        this_ptr, "expr_values_buffer");
-
-    // Load expr_values_null_bits_
-    Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
-        this_ptr, "expr_value_null_bits");
-
-    for (int i = 0; i < exprs.size(); ++i) {
-      BasicBlock* null_block = BasicBlock::Create(context, "null", fn);
-      BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn);
-      BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn);
-
-      // loc_addr = expr_values_buffer_ + expr_values_buffer_offsets_[i]
-      Value* llvm_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
-          codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "loc_addr");
-      llvm_loc = builder.CreatePointerCast(llvm_loc,
-          codegen->GetPtrType(exprs[i]->type()), "loc");
-
-      // Codegen GetValue() for exprs[i]
-      Function* expr_fn;
-      Status status = exprs[i]->GetCodegendComputeFn(codegen, &expr_fn);
-      if (!status.ok()) {
-        fn->eraseFromParent(); // deletes function
-        VLOG_QUERY << "Failed to codegen EvalTupleRow(): " << status.GetDetail();
-        return NULL;
-      }
-
-      // Load evals[i] and call GetValue()
-      Value* eval_arg =
-          codegen->CodegenArrayAt(&builder, eval_vector, i, "eval");
-      DCHECK(eval_arg->getType()->isPointerTy());
-      CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-          exprs[i]->type(), expr_fn, {eval_arg, row}, "result");
-      Value* is_null = result.GetIsNull();
-
-      // Set null-byte result
-      Value* null_bits = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT));
-      Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
-          codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
-      builder.CreateStore(null_bits, llvm_null_bits_loc);
-      builder.CreateCondBr(is_null, null_block, not_null_block);
-
-      // Null block
-      builder.SetInsertPoint(null_block);
-      if (!stores_nulls_) {
-        // hash table doesn't store nulls, no reason to keep evaluating exprs
-        builder.CreateRet(codegen->true_value());
-      } else {
-        CodegenAssignNullValue(codegen, &builder, llvm_loc, exprs[i]->type());
-        has_null = codegen->true_value();
-        builder.CreateBr(continue_block);
-      }
-
-      // Not null block
-      builder.SetInsertPoint(not_null_block);
-      result.ToNativePtr(llvm_loc);
-      builder.CreateBr(continue_block);
-
-      builder.SetInsertPoint(continue_block);
-    }
-  }
-  builder.CreateRet(has_null);
-  return codegen->FinalizeFunction(fn);
-}
-
-uint32_t OldHashTable::HashVariableLenRow() {
-  uint32_t hash = initial_seed_;
-  // Hash the non-var length portions (if there are any)
-  if (var_result_begin_ != 0) {
-    hash = HashUtil::Hash(expr_values_buffer_, var_result_begin_, hash);
-  }
-
-  for (int i = 0; i < build_exprs_.size(); ++i) {
-    // non-string and null slots are already part of expr_values_buffer
-    if (build_exprs_[i]->type().type != TYPE_STRING &&
-        build_exprs_[i]->type().type != TYPE_VARCHAR) {
-      continue;
-    }
-
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    if (expr_value_null_bits_[i]) {
-      // Hash the null random seed values at 'loc'
-      hash = HashUtil::Hash(loc, sizeof(StringValue), hash);
-    } else {
-      // Hash the string
-      StringValue* str = reinterpret_cast<StringValue*>(loc);
-      hash = HashUtil::Hash(str->ptr, str->len, hash);
-    }
-  }
-  return hash;
-}
-
-// Codegen for hashing the current row.  In the case with both string and non-string data
-// (group by int_col, string_col), the IR looks like:
-// define i32 @HashCurrentRow(%"class.impala::OldHashTable"* %this_ptr) {
-// entry:
-//   %0 = call i32 @IrCrcHash(i8* inttoptr (i64 51107808 to i8*), i32 16, i32 0)
-//   %1 = load i8* inttoptr (i64 29500112 to i8*)
-//   %2 = icmp ne i8 %1, 0
-//   br i1 %2, label %null, label %not_null
-//
-// null:                                             ; preds = %entry
-//   %3 = call i32 @IrCrcHash(i8* inttoptr (i64 51107824 to i8*), i32 16, i32 %0)
-//   br label %continue
-//
-// not_null:                                         ; preds = %entry
-//   %4 = load i8** getelementptr inbounds (
-//        %"struct.impala::StringValue"* inttoptr
-//          (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 0)
-//   %5 = load i32* getelementptr inbounds (
-//        %"struct.impala::StringValue"* inttoptr
-//          (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 1)
-//   %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0)
-//   br label %continue
-//
-// continue:                                         ; preds = %not_null, %null
-//   %7 = phi i32 [ %6, %not_null ], [ %3, %null ]
-//   ret i32 %7
-// }
-// TODO: can this be cross-compiled?
-Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) {
-  for (int i = 0; i < build_exprs_.size(); ++i) {
-    // Disable codegen for CHAR
-    if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL;
-  }
-
-  // Get types to generate function prototype
-  Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
-  LlvmCodeGen::FnPrototype prototype(codegen, "HashCurrentRow",
-      codegen->GetType(TYPE_INT));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
-
-  LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-  Value* this_ptr;
-  Function* fn = prototype.GeneratePrototype(&builder, &this_ptr);
-
-  // Load expr_values_buffer_
-  Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
-      IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER, this_ptr, "expr_values_buffer");
-
-  Value* hash_result = codegen->GetIntConstant(TYPE_INT, initial_seed_);
-  if (var_result_begin_ == -1) {
-    // No variable length slots, just hash what is in 'expr_values_buffer_'
-    if (results_buffer_size_ > 0) {
-      Function* hash_fn = codegen->GetHashFunction(results_buffer_size_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_);
-      hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result});
-    }
-  } else {
-    if (var_result_begin_ > 0) {
-      Function* hash_fn = codegen->GetHashFunction(var_result_begin_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_);
-      hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result});
-    }
-
-    // Load expr_value_null_bits_
-    Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
-        this_ptr, "expr_value_null_bits");
-
-    // Hash string slots
-    for (int i = 0; i < build_exprs_.size(); ++i) {
-      if (build_exprs_[i]->type().type != TYPE_STRING
-          && build_exprs_[i]->type().type != TYPE_VARCHAR) continue;
-
-      BasicBlock* null_block = NULL;
-      BasicBlock* not_null_block = NULL;
-      BasicBlock* continue_block = NULL;
-      Value* str_null_result = NULL;
-
-      Value* llvm_buffer_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
-          codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "buffer_loc");
-
-      // If the hash table stores nulls, we need to check if the stringval
-      // evaluated to NULL
-      if (stores_nulls_) {
-        null_block = BasicBlock::Create(context, "null", fn);
-        not_null_block = BasicBlock::Create(context, "not_null", fn);
-        continue_block = BasicBlock::Create(context, "continue", fn);
-
-        // Load expr_values_null_bits_[i] and check if it's set.
-        Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
-            codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
-        Value* null_bits = builder.CreateLoad(llvm_null_bits_loc);
-        Value* is_null = builder.CreateICmpNE(null_bits,
-            codegen->GetIntConstant(TYPE_TINYINT, 0));
-        builder.CreateCondBr(is_null, null_block, not_null_block);
-
-        // For null, we just want to call the hash function on a portion of the data.
-        builder.SetInsertPoint(null_block);
-        Function* null_hash_fn = codegen->GetHashFunction(sizeof(StringValue));
-        Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
-        str_null_result = builder.CreateCall(null_hash_fn,
-            ArrayRef<Value*>({llvm_buffer_loc, len, hash_result}));
-        builder.CreateBr(continue_block);
-
-        builder.SetInsertPoint(not_null_block);
-      }
-
-      // Convert expr_values_buffer_ loc to llvm value
-      Value* str_val = builder.CreatePointerCast(llvm_buffer_loc,
-          codegen->GetPtrType(TYPE_STRING), "str_val");
-
-      Value* ptr = builder.CreateStructGEP(NULL, str_val, 0, "ptr");
-      Value* len = builder.CreateStructGEP(NULL, str_val, 1, "len");
-      ptr = builder.CreateLoad(ptr);
-      len = builder.CreateLoad(len);
-
-      // Call hash(ptr, len, hash_result);
-      Function* general_hash_fn = codegen->GetHashFunction();
-      Value* string_hash_result =
-          builder.CreateCall(general_hash_fn, ArrayRef<Value*>({ptr, len, hash_result}));
-
-      if (stores_nulls_) {
-        builder.CreateBr(continue_block);
-        builder.SetInsertPoint(continue_block);
-        // Use phi node to reconcile that we could have come from the string-null
-        // path and string not null paths.
-        PHINode* phi_node = builder.CreatePHI(codegen->GetType(TYPE_INT), 2);
-        phi_node->addIncoming(string_hash_result, not_null_block);
-        phi_node->addIncoming(str_null_result, null_block);
-        hash_result = phi_node;
-      } else {
-        hash_result = string_hash_result;
-      }
-    }
-  }
-
-  builder.CreateRet(hash_result);
-  return codegen->FinalizeFunction(fn);
-}
-
-bool OldHashTable::Equals(TupleRow* build_row) {
-  for (int i = 0; i < build_exprs_.size(); ++i) {
-    void* val = build_expr_evals_[i]->GetValue(build_row);
-    if (val == NULL) {
-      if (!(stores_nulls_ && finds_nulls_[i])) return false;
-      if (!expr_value_null_bits_[i]) return false;
-      continue;
-    } else {
-      if (expr_value_null_bits_[i]) return false;
-    }
-
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    if (!RawValue::Eq(loc, val, build_exprs_[i]->type())) {
-      return false;
-    }
-  }
-  return true;
-}
-
-// Codegen for OldHashTable::Equals.  For a hash table with two exprs (string,int), the
-// IR looks like:
-//
-// define i1 @Equals(%"class.impala::OldHashTable"* %this_ptr,
-//                   %"class.impala::TupleRow"* %row) {
-// entry:
-//   %result = call i64 @GetSlotRef(%"class.impala::ScalarExpr"* inttoptr
-//                                  (i64 146381856 to %"class.impala::ScalarExpr"*),
-//                                  %"class.impala::TupleRow"* %row)
-//   %0 = trunc i64 %result to i1
-//   br i1 %0, label %null, label %not_null
-//
-// false_block:                            ; preds = %not_null2, %null1, %not_null, %null
-//   ret i1 false
-//
-// null:                                             ; preds = %entry
-//   br i1 false, label %continue, label %false_block
-//
-// not_null:                                         ; preds = %entry
-//   %1 = load i32* inttoptr (i64 104774368 to i32*)
-//   %2 = ashr i64 %result, 32
-//   %3 = trunc i64 %2 to i32
-//   %cmp_raw = icmp eq i32 %3, %1
-//   br i1 %cmp_raw, label %continue, label %false_block
-//
-// continue:                                         ; preds = %not_null, %null
-//   %result4 = call { i64, i8* } @GetSlotRef1(
-//       %"class.impala::ScalarExpr"* inttoptr
-//       (i64 146381696 to %"class.impala::ScalarExpr"*),
-//       %"class.impala::TupleRow"* %row)
-//   %4 = extractvalue { i64, i8* } %result4, 0
-//   %5 = trunc i64 %4 to i1
-//   br i1 %5, label %null1, label %not_null2
-//
-// null1:                                            ; preds = %continue
-//   br i1 false, label %continue3, label %false_block
-//
-// not_null2:                                        ; preds = %continue
-//   %6 = extractvalue { i64, i8* } %result4, 0
-//   %7 = ashr i64 %6, 32
-//   %8 = trunc i64 %7 to i32
-//   %result5 = extractvalue { i64, i8* } %result4, 1
-//   %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE(
-//       i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr
-//       (i64 104774384 to %"struct.impala::StringValue"*))
-//   br i1 %cmp_raw6, label %continue3, label %false_block
-//
-// continue3:                                        ; preds = %not_null2, %null1
-//   ret i1 true
-// }
-Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) {
-  for (int i = 0; i < build_exprs_.size(); ++i) {
-    // Disable codegen for CHAR
-    if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL;
-  }
-
-  // Get types to generate function prototype
-  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
-  DCHECK(tuple_row_type != NULL);
-  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
-  Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
-  DCHECK(this_type != NULL);
-  PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
-  LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-  Value* args[2];
-  Function* fn = prototype.GeneratePrototype(&builder, args);
-  Value* this_ptr = args[0];
-  Value* row = args[1];
-
-  if (!build_exprs_.empty()) {
-    BasicBlock* false_block = BasicBlock::Create(context, "false_block", fn);
-
-    // Load build_expr_evals_.data()
-    Value* eval_vector = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS,
-        this_ptr, "eval_vector");
-
-    // Load expr_values_buffer_
-    Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER,
-        this_ptr, "expr_values_buffer");
-
-    // Load expr_value_null_bits_
-    Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
-        IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
-        this_ptr, "expr_value_null_bits");
-
-    for (int i = 0; i < build_exprs_.size(); ++i) {
-      BasicBlock* null_block = BasicBlock::Create(context, "null", fn);
-      BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn);
-      BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn);
-
-      // Generate GetValue() of build_expr_evals_[i]
-      Function* expr_fn;
-      Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, &expr_fn);
-      if (!status.ok()) {
-        fn->eraseFromParent(); // deletes function
-        VLOG_QUERY << "Failed to codegen Equals(): " << status.GetDetail();
-        return NULL;
-      }
-
-      // Call GetValue() on build_expr_evals_[i]
-      Value* eval_arg =
-          codegen->CodegenArrayAt(&builder, eval_vector, i, "eval");
-      CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
-          build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result");
-      Value* is_null = result.GetIsNull();
-
-      // Determine if probe is null (i.e. expr_value_null_bits_[i] == true). In
-      // the case where the hash table does not store nulls, this is always false.
-      Value* probe_is_null = codegen->false_value();
-      if (stores_nulls_ && finds_nulls_[i]) {
-        Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
-            codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
-        Value* null_bits = builder.CreateLoad(llvm_null_bits_loc, "null_bits");
-        probe_is_null = builder.CreateICmpNE(null_bits,
-            codegen->GetIntConstant(TYPE_TINYINT, 0));
-      }
-
-      // Get llvm value for probe_val from 'expr_values_buffer_'
-      Value* probe_val = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
-          codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "probe_val");
-      probe_val = builder.CreatePointerCast(
-          probe_val, codegen->GetPtrType(build_exprs_[i]->type()));
-
-      // Branch for GetValue() returning NULL
-      builder.CreateCondBr(is_null, null_block, not_null_block);
-
-      // Null block
-      builder.SetInsertPoint(null_block);
-      builder.CreateCondBr(probe_is_null, continue_block, false_block);
-
-      // Not-null block
-      builder.SetInsertPoint(not_null_block);
-      if (stores_nulls_) {
-        BasicBlock* cmp_block = BasicBlock::Create(context, "cmp", fn);
-        // First need to compare that probe_expr[i] is not null
-        builder.CreateCondBr(probe_is_null, false_block, cmp_block);
-        builder.SetInsertPoint(cmp_block);
-      }
-      // Check result == probe_val
-      Value* is_equal = result.EqToNativePtr(probe_val);
-      builder.CreateCondBr(is_equal, continue_block, false_block);
-
-      builder.SetInsertPoint(continue_block);
-    }
-    builder.CreateRet(codegen->true_value());
-
-    builder.SetInsertPoint(false_block);
-    builder.CreateRet(codegen->false_value());
-  } else {
-    builder.CreateRet(codegen->true_value());
-  }
-  return codegen->FinalizeFunction(fn);
-}
-
-void OldHashTable::ResizeBuckets(int64_t num_buckets) {
-  DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
-      << "num_buckets=" << num_buckets << " must be a power of 2";
-
-  int64_t old_num_buckets = num_buckets_;
-  // This can be a rather large allocation so check the limit before (to prevent
-  // us from going over the limits too much).
-  int64_t delta_size = (num_buckets - old_num_buckets) * sizeof(Bucket);
-  if (!mem_tracker_->TryConsume(delta_size)) {
-    MemLimitExceeded(delta_size);
-    return;
-  }
-  buckets_.resize(num_buckets);
-
-  // If we're doubling the number of buckets, all nodes in a particular bucket
-  // either remain there, or move down to an analogous bucket in the other half.
-  // In order to efficiently check which of the two buckets a node belongs in, the number
-  // of buckets must be a power of 2.
-  bool doubled_buckets = (num_buckets == old_num_buckets * 2);
-  for (int i = 0; i < num_buckets_; ++i) {
-    Bucket* bucket = &buckets_[i];
-    Bucket* sister_bucket = &buckets_[i + old_num_buckets];
-    Node* last_node = NULL;
-    Node* node = bucket->node;
-
-    while (node != NULL) {
-      Node* next = node->next;
-      uint32_t hash = node->hash;
-
-      bool node_must_move;
-      Bucket* move_to;
-      if (doubled_buckets) {
-        node_must_move = ((hash & old_num_buckets) != 0);
-        move_to = sister_bucket;
-      } else {
-        int64_t bucket_idx = hash & (num_buckets - 1);
-        node_must_move = (bucket_idx != i);
-        move_to = &buckets_[bucket_idx];
-      }
-
-      if (node_must_move) {
-        MoveNode(bucket, move_to, node, last_node);
-      } else {
-        last_node = node;
-      }
-
-      node = next;
-    }
-  }
-
-  num_buckets_ = num_buckets;
-  num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_;
-}
-
-void OldHashTable::GrowNodeArray() {
-  node_remaining_current_page_ = HT_PAGE_SIZE / sizeof(Node);
-  next_node_ = reinterpret_cast<Node*>(mem_pool_->Allocate(HT_PAGE_SIZE));
-  ++num_data_pages_;
-  if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
-    ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(HT_PAGE_SIZE);
-  }
-  if (mem_tracker_->LimitExceeded()) MemLimitExceeded(HT_PAGE_SIZE);
-}
-
-void OldHashTable::MemLimitExceeded(int64_t allocation_size) {
-  mem_limit_exceeded_ = true;
-  if (state_ != NULL) state_->SetMemLimitExceeded(mem_tracker_, allocation_size);
-}
-
-string OldHashTable::DebugString(bool skip_empty, bool show_match,
-    const RowDescriptor* desc) {
-  stringstream ss;
-  ss << endl;
-  for (int i = 0; i < buckets_.size(); ++i) {
-    Node* node = buckets_[i].node;
-    bool first = true;
-    if (skip_empty && node == NULL) continue;
-    ss << i << ": ";
-    while (node != NULL) {
-      if (!first) ss << ",";
-      ss << node << "(" << node->data << ")";
-      if (desc != NULL) ss << " " << PrintRow(GetRow(node), *desc);
-      if (show_match) {
-        if (node->matched) {
-          ss << " [M]";
-        } else {
-          ss << " [U]";
-        }
-      }
-      node = node->next;
-      first = false;
-    }
-    ss << endl;
-  }
-  return ss.str();
-}


[3/6] incubator-impala git commit: IMPALA-4674: Part 1: remove old aggs and joins

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.h b/be/src/exec/old-hash-table.h
deleted file mode 100644
index 406f360..0000000
--- a/be/src/exec/old-hash-table.h
+++ /dev/null
@@ -1,548 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_OLD_HASH_TABLE_H
-#define IMPALA_EXEC_OLD_HASH_TABLE_H
-
-#include <vector>
-#include <boost/cstdint.hpp>
-#include <boost/scoped_ptr.hpp>
-#include "codegen/impala-ir.h"
-#include "common/logging.h"
-#include "runtime/mem-pool.h"
-#include "util/hash-util.h"
-#include "util/runtime-profile.h"
-
-namespace llvm {
-  class Function;
-}
-
-namespace impala {
-
-class LlvmCodeGen;
-class MemTracker;
-class RuntimeFilter;
-class RowDescriptor;
-class RuntimeState;
-class ScalarExpr;
-class ScalarExprEvaluator;
-class Tuple;
-class TupleRow;
-
-/// TODO: Temporarily moving the old HashTable implementation to make it work with the
-/// non-partitioned versions of HJ and AGG. It should be removed once we remove those
-/// non-partitioned versions.
-
-/// Hash table implementation designed for hash aggregation and hash joins.  This is not
-/// templatized and is tailored to the usage pattern for aggregation and joins.  The
-/// hash table store TupleRows and allows for different exprs for insertions and finds.
-/// This is the pattern we use for joins and aggregation where the input/build tuple
-/// row descriptor is different from the find/probe descriptor.
-/// The table is optimized for the query engine's use case as much as possible and is not
-/// intended to be a generic hash table implementation.  The API loosely mimics the
-/// std::hashset API.
-//
-/// The hash table stores evaluated expr results for the current row being processed
-/// when possible into a contiguous memory buffer. This allows for very efficient
-/// computation for hashing.  The implementation is also designed to allow codegen
-/// for some paths.
-//
-/// The hash table does not support removes. The hash table is not thread safe.
-//
-/// The implementation is based on the boost multiset.  The hashtable is implemented by
-/// two data structures: a vector of buckets and a vector of nodes.  Inserted values
-/// are stored as nodes (in the order they are inserted).  The buckets (indexed by the
-/// mod of the hash) contain pointers to the node vector.  Nodes that fall in the same
-/// bucket are linked together (the bucket pointer gets you the head of that linked list).
-/// When growing the hash table, the number of buckets is doubled, and nodes from a
-/// particular bucket either stay in place or move to an analogous bucket in the second
-/// half of buckets. This behavior allows us to avoid moving about half the nodes each
-/// time, and maintains good cache properties by only accessing 2 buckets at a time.
-/// The node vector is modified in place.
-/// Due to the doubling nature of the buckets, we require that the number of buckets is a
-/// power of 2. This allows us to determine if a node needs to move by simply checking a
-/// single bit, and further allows us to initially hash nodes using a bitmask.
-//
-/// TODO: this is not a fancy hash table in terms of memory access patterns (cuckoo-hashing
-/// or something that spills to disk). We will likely want to invest more time into this.
-/// TODO: hash-join and aggregation have very different access patterns.  Joins insert
-/// all the rows and then calls scan to find them.  Aggregation interleaves Find() and
-/// Inserts().  We can want to optimize joins more heavily for Inserts() (in particular
-/// growing).
-/// TODO: batched interface for inserts and finds.
-class OldHashTable {
- private:
-  struct Node;
-
- public:
-  class Iterator;
-
-  /// Create a hash table.
-  ///  - build_exprs are the exprs that should be used to evaluate rows during Insert().
-  ///  - probe_exprs are used during Find()
-  ///  - filter_exprs are used to build runtime filters.
-  ///  - num_build_tuples: number of Tuples in the build tuple row
-  ///  - stores_nulls: if false, TupleRows with nulls are ignored during Insert
-  ///  - finds_nulls: if finds_nulls[i] is false, Find() returns End() for TupleRows with
-  ///      nulls in position i even if stores_nulls is true.
-  ///  - num_buckets: number of buckets that the hash table should be initialized to
-  ///  - mem_tracker: if non-empty, all memory allocations for nodes and for buckets are
-  ///    tracked; the tracker must be valid until the d'tor is called
-  ///  - initial_seed: Initial seed value to use when computing hashes for rows
-  ///  - stores_tuples: If true, the hash table stores tuples, otherwise it stores tuple
-  ///    rows.
-  /// TODO: stores_nulls is too coarse: for a hash table in which some columns are joined
-  ///       with '<=>' and others with '=', stores_nulls could distinguish between columns
-  ///       in which nulls are stored and columns in which they are not, which could save
-  ///       space by not storing some rows we know will never match.
-  static Status Create(ObjectPool* pool, RuntimeState* state,
-      const std::vector<ScalarExpr*>& build_exprs,
-      const std::vector<ScalarExpr*>& probe_exprs,
-      const std::vector<ScalarExpr*>& filter_exprs,
-      int num_build_tuples, bool stores_nulls,
-      const std::vector<bool>& finds_nulls, int32_t initial_seed,
-      MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& runtime_filters,
-      boost::scoped_ptr<OldHashTable>* hash_tbl_, bool stores_tuples = false,
-      int64_t num_buckets = 1024);
-
-  /// Initializes the evaluators for build, probe and filter expressions.
-  Status Open(RuntimeState* state);
-
-  /// Call to cleanup any resources. Must be called once.
-  void Close(RuntimeState* state);
-
-  /// Frees local allocations made by expression evaluators.
-  void FreeLocalAllocations();
-
-  /// Insert row into the hash table.  Row will be evaluated over build exprs.
-  /// This will grow the hash table if necessary.
-  /// If the hash table has or needs to go over the mem limit, the Insert will
-  /// be ignored. The caller is assumed to periodically (e.g. per row batch) check
-  /// the limits to identify this case.
-  /// The 'row' is not copied by the hash table and the caller must guarantee it
-  /// stays in memory.
-  /// Returns false if there was not enough memory to insert the row.
-  bool IR_ALWAYS_INLINE Insert(TupleRow* row) {
-    if (UNLIKELY(mem_limit_exceeded_)) return false;
-    bool has_null = EvalBuildRow(row);
-    if (!stores_nulls_ && has_null) return true;
-
-    if (UNLIKELY(num_filled_buckets_ > num_buckets_till_resize_)) {
-      /// TODO: next prime instead of double?
-      ResizeBuckets(num_buckets_ * 2);
-      if (UNLIKELY(mem_limit_exceeded_)) return false;
-    }
-    return InsertImpl(row);
-  }
-
-  bool IR_ALWAYS_INLINE Insert(Tuple* tuple) {
-    if (UNLIKELY(mem_limit_exceeded_)) return false;
-    bool has_null = EvalBuildRow(reinterpret_cast<TupleRow*>(&tuple));
-    if (!stores_nulls_ && has_null) return true;
-
-    if (UNLIKELY(num_filled_buckets_ > num_buckets_till_resize_)) {
-      /// TODO: next prime instead of double?
-      ResizeBuckets(num_buckets_ * 2);
-      if (UNLIKELY(mem_limit_exceeded_)) return false;
-    }
-    return InsertImpl(tuple);
-  }
-
-  /// Evaluate and hash the build/probe row, returning in *hash. Returns false if this
-  /// row should be rejected (doesn't need to be processed further) because it
-  /// contains NULL.
-  /// These need to be inlined in the IR module so we can find and replace the calls to
-  /// EvalBuildRow()/EvalProbeRow().
-  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash);
-  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash);
-
-  /// Returns the start iterator for all rows that match 'probe_row'.  'probe_row' is
-  /// evaluated with probe exprs.  The iterator can be iterated until OldHashTable::End()
-  /// to find all the matching rows.
-  /// Only one scan can be in progress at any time (i.e. it is not legal to call
-  /// Find(), begin iterating through all the matches, call another Find(),
-  /// and continuing iterator from the first scan iterator).
-  /// Advancing the returned iterator will go to the next matching row.  The matching
-  /// rows are evaluated lazily (i.e. computed as the Iterator is moved).
-  /// Returns OldHashTable::End() if there is no match.
-  Iterator IR_ALWAYS_INLINE Find(TupleRow* probe_row);
-
-  /// Returns number of elements in the hash table
-  int64_t size() const { return num_nodes_; }
-
-  /// Returns the number of buckets
-  int64_t num_buckets() const { return buckets_.size(); }
-
-  /// Returns the load factor (the number of non-empty buckets)
-  float load_factor() const {
-    return num_filled_buckets_ / static_cast<float>(buckets_.size());
-  }
-
-  /// Returns an estimate of the number of bytes needed to build the hash table
-  /// structure for 'num_rows'.
-  static int64_t EstimateSize(int64_t num_rows) {
-    // Assume 50% fill factor.
-    int64_t num_buckets = num_rows * 2;
-    return num_buckets * sizeof(Bucket) + num_rows * sizeof(Node);
-  }
-
-  /// Returns the number of bytes allocated to the hash table
-  int64_t byte_size() const {
-    int64_t nodes_mem = (num_nodes_ + node_remaining_current_page_) * sizeof(Node);
-    return nodes_mem + sizeof(Bucket) * buckets_.capacity();
-  }
-
-  bool mem_limit_exceeded() const { return mem_limit_exceeded_; }
-
-  /// Returns the results of the exprs at 'expr_idx' evaluated over the last row
-  /// processed by the OldHashTable.
-  /// This value is invalid if the expr evaluated to NULL.
-  /// TODO: this is an awkward abstraction but aggregation node can take advantage of
-  /// it and save some expr evaluation calls.
-  void* last_expr_value(int expr_idx) const {
-    return expr_values_buffer_ + expr_values_buffer_offsets_[expr_idx];
-  }
-
-  /// Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
-  bool last_expr_value_null(int expr_idx) const {
-    return expr_value_null_bits_[expr_idx];
-  }
-
-  /// Can be called after all insert calls to generate runtime filters, which are then
-  /// published to the local runtime state's RuntimeFilterBank. Returns the number of
-  /// filters that are enabled.
-  int AddBloomFilters();
-
-  /// Returns an iterator at the beginning of the hash table.  Advancing this iterator
-  /// will traverse all elements.
-  Iterator Begin();
-
-  /// Return an iterator pointing to the first element in the hash table that does not
-  /// have its matched flag set. Used in right-outer and full-outer joins.
-  Iterator FirstUnmatched();
-
-  /// Returns end marker
-  Iterator End() { return Iterator(); }
-
-  /// Codegen for evaluating a tuple row.  Codegen'd function matches the signature
-  /// for EvalBuildRow and EvalTupleRow.
-  /// if build_row is true, the codegen uses the build_exprs, otherwise the probe_exprs
-  llvm::Function* CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build_row);
-
-  /// Codegen for hashing the expr values in 'expr_values_buffer_'.  Function
-  /// prototype matches HashCurrentRow identically.
-  llvm::Function* CodegenHashCurrentRow(LlvmCodeGen* codegen);
-
-  /// Codegen for evaluating a TupleRow and comparing equality against
-  /// 'expr_values_buffer_'.  Function signature matches OldHashTable::Equals()
-  llvm::Function* CodegenEquals(LlvmCodeGen* codegen);
-
-  static const char* LLVM_CLASS_NAME;
-
-  /// Dump out the entire hash table to string.  If skip_empty, empty buckets are
-  /// skipped.  If show_match, it also prints the matched flag of each node. If build_desc
-  /// is non-null, the build rows will be output.  Otherwise just the build row addresses.
-  std::string DebugString(bool skip_empty, bool show_match,
-      const RowDescriptor* build_desc);
-
-  /// stl-like iterator interface.
-  class Iterator {
-   public:
-    Iterator() : table_(NULL), bucket_idx_(-1), node_(NULL) {
-    }
-
-    /// Iterates to the next element.  In the case where the iterator was
-    /// from a Find, this will lazily evaluate that bucket, only returning
-    /// TupleRows that match the current scan row. No-op if the iterator is at the end.
-    template<bool check_match>
-    void IR_ALWAYS_INLINE Next();
-
-    /// Iterates to the next element that does not have its matched flag set. Returns false
-    /// if it reaches the end of the table without finding an unmatched element. Used in
-    /// right-outer and full-outer joins.
-    bool NextUnmatched();
-
-    /// Returns the current row. Callers must check the iterator is not AtEnd() before
-    /// calling GetRow().
-    TupleRow* GetRow() {
-      DCHECK(!AtEnd());
-      DCHECK(!table_->stores_tuples_);
-      return reinterpret_cast<TupleRow*>(node_->data);
-    }
-
-    Tuple* GetTuple() {
-      DCHECK(!AtEnd());
-      DCHECK(table_->stores_tuples_);
-      return reinterpret_cast<Tuple*>(node_->data);
-    }
-
-    void set_matched(bool v) {
-      DCHECK(!AtEnd());
-      node_->matched = v;
-    }
-
-    bool matched() const {
-      DCHECK(!AtEnd());
-      return node_->matched;
-    }
-
-    void reset() {
-      bucket_idx_ = -1;
-      node_ = NULL;
-    }
-
-    /// Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
-    bool AtEnd() const { return node_ == NULL; }
-    bool operator!=(const Iterator& rhs) { return !(*this == rhs); }
-
-    bool operator==(const Iterator& rhs) {
-      return bucket_idx_ == rhs.bucket_idx_ && node_ == rhs.node_;
-    }
-
-   private:
-    friend class OldHashTable;
-
-    Iterator(OldHashTable* table, int bucket_idx, Node* node, uint32_t hash) :
-      table_(table),
-      bucket_idx_(bucket_idx),
-      node_(node),
-      scan_hash_(hash) {
-    }
-
-    OldHashTable* table_;
-
-    /// Current bucket idx
-    int64_t bucket_idx_;
-
-    /// Current node idx (within current bucket)
-    Node* node_;
-
-    /// Cached hash value for the row passed to Find()
-    uint32_t scan_hash_;
-  };
-
- private:
-  friend class Iterator;
-  friend class OldHashTableTest;
-
-  /// TODO: bit pack this struct. The default alignment makes this struct have a lot
-  /// of wasted bytes.
-  struct Node {
-    /// Only used for full/right outer hash join to indicate if this row has been
-    /// matched.
-    /// From an abstraction point of view, this is an awkward place to store this
-    /// information but it is very efficient here. This space is otherwise unused
-    /// (and we can bitpack this more in the future).
-    bool matched;
-
-    uint32_t hash;   // Cache of the hash for data_
-    Node* next;      // Chain to next node for collisions
-    void* data;      // Either the Tuple* or TupleRow*
-  };
-
-  struct Bucket {
-    Node* node;
-    Bucket() : node(NULL) { }
-  };
-
-  /// Use Create() instead.
-  OldHashTable(RuntimeState* state, const std::vector<ScalarExpr*>& build_exprs,
-      const std::vector<ScalarExpr*>& probe_exprs,
-      const std::vector<ScalarExpr*>& filter_exprs, int num_build_tuples,
-      bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed,
-      MemTracker* mem_tracker, const std::vector<RuntimeFilter*>& filters,
-      bool stores_tuples, int64_t num_buckets);
-
-  Status Init(ObjectPool* pool, RuntimeState* state);
-
-  /// Simple wrappers to return various fields in this class. These functions are
-  /// cross-compiled and they exist to avoid the need to make assumption about the
-  /// order of declaration of these fields when generating the handcrafted IR.
-  uint8_t* IR_ALWAYS_INLINE expr_values_buffer() const;
-  uint8_t* IR_ALWAYS_INLINE expr_value_null_bits() const;
-  ScalarExprEvaluator* const* IR_ALWAYS_INLINE build_expr_evals() const;
-  ScalarExprEvaluator* const* IR_ALWAYS_INLINE probe_expr_evals() const;
-
-  /// Returns the next non-empty bucket and updates idx to be the index of that bucket.
-  /// If there are no more buckets, returns NULL and sets idx to -1
-  Bucket* NextBucket(int64_t* bucket_idx);
-
-  /// Resize the hash table to 'num_buckets'
-  void ResizeBuckets(int64_t num_buckets);
-
-  /// Insert row into the hash table
-  bool IR_ALWAYS_INLINE InsertImpl(void* data);
-
-  /// Chains the node at 'node_idx' to 'bucket'.  Nodes in a bucket are chained
-  /// as a linked list; this places the new node at the beginning of the list.
-  void AddToBucket(Bucket* bucket, Node* node);
-
-  /// Moves a node from one bucket to another. 'previous_node' refers to the
-  /// node (if any) that's chained before this node in from_bucket's linked list.
-  void MoveNode(Bucket* from_bucket, Bucket* to_bucket, Node* node,
-                Node* previous_node);
-
-  /// Evaluate the exprs over row and cache the results in 'expr_values_buffer_'.
-  /// Returns whether any expr evaluated to NULL
-  /// This will be replaced by codegen
-  bool EvalRow(TupleRow* row, const std::vector<ScalarExprEvaluator*>& evals);
-
-  /// Evaluate 'row' over build exprs caching the results in 'expr_values_buffer_' This
-  /// will be replaced by codegen.  We do not want this function inlined when cross
-  /// compiled because we need to be able to differentiate between EvalBuildRow and
-  /// EvalProbeRow by name and the build/probe exprs are baked into the codegen'd function.
-  bool IR_NO_INLINE EvalBuildRow(TupleRow* row) {
-    return EvalRow(row, build_expr_evals_);
-  }
-
-  /// Evaluate 'row' over probe exprs caching the results in 'expr_values_buffer_'
-  /// This will be replaced by codegen.
-  bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
-    return EvalRow(row, probe_expr_evals_);
-  }
-
-  /// Compute the hash of the values in expr_values_buffer_.
-  /// This will be replaced by codegen.  We don't want this inlined for replacing
-  /// with codegen'd functions so the function name does not change.
-  uint32_t IR_NO_INLINE HashCurrentRow() {
-    if (var_result_begin_ == -1) {
-      /// This handles NULLs implicitly since a constant seed value was put
-      /// into results buffer for nulls.
-      return HashUtil::Hash(expr_values_buffer_, results_buffer_size_, initial_seed_);
-    } else {
-      return HashVariableLenRow();
-    }
-  }
-
-  TupleRow* GetRow(Node* node) const {
-    if (stores_tuples_) {
-      return reinterpret_cast<TupleRow*>(&node->data);
-    } else {
-      return reinterpret_cast<TupleRow*>(node->data);
-    }
-  }
-
-  /// Compute the hash of the values in expr_values_buffer_ for rows with variable length
-  /// fields (e.g. strings)
-  uint32_t HashVariableLenRow();
-
-  /// Returns true if the values of build_exprs evaluated over 'build_row' equal
-  /// the values cached in expr_values_buffer_
-  /// This will be replaced by codegen.
-  bool Equals(TupleRow* build_row);
-
-  /// Grow the node array.
-  void GrowNodeArray();
-
-  /// Sets mem_limit_exceeded_ to true and MEM_LIMIT_EXCEEDED for the query.
-  /// allocation_size is the attempted size of the allocation that would have
-  /// brought us over the mem limit.
-  void MemLimitExceeded(int64_t allocation_size);
-
-  /// Load factor that will trigger growing the hash table on insert.  This is
-  /// defined as the number of non-empty buckets / total_buckets
-  static const float MAX_BUCKET_OCCUPANCY_FRACTION;
-
-  RuntimeState* state_;
-
-  /// References to the build expressions evaluated on each build-row during insertion
-  /// and lookup, the probe expressions used during lookup and the filter expression,
-  /// one per filter in filters_, evaluated on per-build row to produce the value with
-  /// which to update the corresponding filter.
-  const std::vector<ScalarExpr*>& build_exprs_;
-  const std::vector<ScalarExpr*>& probe_exprs_;
-  const std::vector<ScalarExpr*>& filter_exprs_;
-
-  /// Evaluators for the expressions above.
-  std::vector<ScalarExprEvaluator*> build_expr_evals_;
-  std::vector<ScalarExprEvaluator*> probe_expr_evals_;
-  std::vector<ScalarExprEvaluator*> filter_expr_evals_;
-
-  /// List of filters to build during build phase.
-  std::vector<RuntimeFilter*> filters_;
-
-  /// Number of Tuple* in the build tuple row
-  const int num_build_tuples_;
-
-  /// Constants on how the hash table should behave. Joins and aggs have slightly
-  /// different behavior.
-  /// TODO: these constants are an ideal candidate to be removed with codegen.
-  const bool stores_nulls_;
-  const std::vector<bool> finds_nulls_;
-
-  /// finds_some_nulls_ is just the logical OR of finds_nulls_.
-  const bool finds_some_nulls_;
-
-  const bool stores_tuples_;
-
-  const int32_t initial_seed_;
-
-  /// Number of non-empty buckets.  Used to determine when to grow and rehash
-  int64_t num_filled_buckets_;
-
-  /// number of nodes stored (i.e. size of hash table)
-  int64_t num_nodes_;
-
-  /// MemPool used to allocate data pages.
-  boost::scoped_ptr<MemPool> mem_pool_;
-
-  /// Number of data pages for nodes.
-  int num_data_pages_;
-
-  /// Next node to insert.
-  Node* next_node_;
-
-  /// Number of nodes left in the current page.
-  int node_remaining_current_page_;
-
-  MemTracker* mem_tracker_;
-
-  /// Set to true if the hash table exceeds the memory limit. If this is set,
-  /// subsequent calls to Insert() will be ignored.
-  bool mem_limit_exceeded_;
-
-  std::vector<Bucket> buckets_;
-
-  /// equal to buckets_.size() but more efficient than the size function
-  int64_t num_buckets_;
-
-  /// The number of filled buckets to trigger a resize.  This is cached for efficiency
-  int64_t num_buckets_till_resize_;
-
-  /// Cache of exprs values for the current row being evaluated.  This can either
-  /// be a build row (during Insert()) or probe row (during Find()).
-  std::vector<int> expr_values_buffer_offsets_;
-
-  /// byte offset into expr_values_buffer_ that begins the variable length results
-  int var_result_begin_;
-
-  /// byte size of 'expr_values_buffer_'
-  int results_buffer_size_;
-
-  /// buffer to store evaluated expr results.  This address must not change once
-  /// allocated since the address is baked into the codegen
-  uint8_t* expr_values_buffer_;
-
-  /// Use bytes instead of bools to be compatible with llvm.  This address must
-  /// not change once allocated.
-  uint8_t* expr_value_null_bits_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.inline.h b/be/src/exec/old-hash-table.inline.h
deleted file mode 100644
index e2bbe25..0000000
--- a/be/src/exec/old-hash-table.inline.h
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_OLD_HASH_TABLE_INLINE_H
-#define IMPALA_EXEC_OLD_HASH_TABLE_INLINE_H
-
-#include "exec/old-hash-table.h"
-
-namespace impala {
-
-inline OldHashTable::Iterator OldHashTable::Find(TupleRow* probe_row) {
-  uint32_t hash;
-  if (!EvalAndHashProbe(probe_row, &hash)) return End();
-  int64_t bucket_idx = hash & (num_buckets_ - 1);
-  Bucket* bucket = &buckets_[bucket_idx];
-  Node* node = bucket->node;
-  while (node != NULL) {
-    if (node->hash == hash && Equals(GetRow(node))) {
-      return Iterator(this, bucket_idx, node, hash);
-    }
-    node = node->next;
-  }
-  return End();
-}
-
-inline OldHashTable::Iterator OldHashTable::Begin() {
-  int64_t bucket_idx = -1;
-  Bucket* bucket = NextBucket(&bucket_idx);
-  if (bucket != NULL) return Iterator(this, bucket_idx, bucket->node, 0);
-  return End();
-}
-
-inline OldHashTable::Iterator OldHashTable::FirstUnmatched() {
-  int64_t bucket_idx = -1;
-  Bucket* bucket = NextBucket(&bucket_idx);
-  while (bucket != NULL) {
-    Node* node = bucket->node;
-    while (node != NULL && node->matched) {
-      node = node->next;
-    }
-    if (node == NULL) {
-      bucket = NextBucket(&bucket_idx);
-    } else {
-      DCHECK(!node->matched);
-      return Iterator(this, bucket_idx, node, 0);
-    }
-  }
-  return End();
-}
-
-inline OldHashTable::Bucket* OldHashTable::NextBucket(int64_t* bucket_idx) {
-  ++*bucket_idx;
-  for (; *bucket_idx < num_buckets_; ++*bucket_idx) {
-    if (buckets_[*bucket_idx].node != NULL) return &buckets_[*bucket_idx];
-  }
-  *bucket_idx = -1;
-  return NULL;
-}
-
-inline bool OldHashTable::InsertImpl(void* data) {
-  uint32_t hash = HashCurrentRow();
-  int64_t bucket_idx = hash & (num_buckets_ - 1);
-  if (node_remaining_current_page_ == 0) {
-    GrowNodeArray();
-    if (UNLIKELY(mem_limit_exceeded_)) return false;
-  }
-  next_node_->hash = hash;
-  next_node_->data = data;
-  next_node_->matched = false;
-  AddToBucket(&buckets_[bucket_idx], next_node_);
-  DCHECK_GT(node_remaining_current_page_, 0);
-  --node_remaining_current_page_;
-  ++next_node_;
-  ++num_nodes_;
-  return true;
-}
-
-inline void OldHashTable::AddToBucket(Bucket* bucket, Node* node) {
-  num_filled_buckets_ += (bucket->node == NULL);
-  node->next = bucket->node;
-  bucket->node = node;
-}
-
-inline bool OldHashTable::EvalAndHashBuild(TupleRow* row, uint32_t* hash) {
-  bool has_null = EvalBuildRow(row);
-  if (!stores_nulls_ && has_null) return false;
-  *hash = HashCurrentRow();
-  return true;
-}
-
-inline bool OldHashTable::EvalAndHashProbe(TupleRow* row, uint32_t* hash) {
-  bool has_null = EvalProbeRow(row);
-  if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false;
-  *hash = HashCurrentRow();
-  return true;
-}
-
-inline void OldHashTable::MoveNode(Bucket* from_bucket, Bucket* to_bucket,
-    Node* node, Node* previous_node) {
-  if (previous_node != NULL) {
-    previous_node->next = node->next;
-  } else {
-    // Update bucket directly
-    from_bucket->node = node->next;
-    num_filled_buckets_ -= (node->next == NULL);
-  }
-  AddToBucket(to_bucket, node);
-}
-
-template<bool check_match>
-inline void OldHashTable::Iterator::Next() {
-  if (bucket_idx_ == -1) return;
-
-  // TODO: this should prefetch the next tuplerow
-  // Iterator is not from a full table scan, evaluate equality now.  Only the current
-  // bucket needs to be scanned. 'expr_values_buffer_' contains the results
-  // for the current probe row.
-  if (check_match) {
-    // TODO: this should prefetch the next node
-    Node* node = node_->next;
-    while (node != NULL) {
-      if (node->hash == scan_hash_ && table_->Equals(table_->GetRow(node))) {
-        node_ = node;
-        return;
-      }
-      node = node->next;
-    }
-    *this = table_->End();
-  } else {
-    // Move onto the next chained node
-    if (node_->next != NULL) {
-      node_ = node_->next;
-      return;
-    }
-
-    // Move onto the next bucket
-    Bucket* bucket = table_->NextBucket(&bucket_idx_);
-    if (bucket == NULL) {
-      bucket_idx_ = -1;
-      node_ = NULL;
-    } else {
-      node_ = bucket->node;
-    }
-  }
-}
-
-inline bool OldHashTable::Iterator::NextUnmatched() {
-  if (bucket_idx_ == -1) return false;
-  while (true) {
-    while (node_->next != NULL && node_->next->matched) {
-      node_ = node_->next;
-    }
-    if (node_->next == NULL) {
-      // Move onto the next bucket.
-      Bucket* bucket = table_->NextBucket(&bucket_idx_);
-      if (bucket == NULL) {
-        bucket_idx_ = -1;
-        node_ = NULL;
-        return false;
-      } else {
-        node_ = bucket->node;
-        if (node_ != NULL && !node_->matched) return true;
-      }
-    } else {
-      DCHECK(!node_->next->matched);
-      node_ = node_->next;
-      return true;
-    }
-  }
-}
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 0f731d3..a5c9897 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -211,11 +211,6 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   return ExecNode::Reset(state);
 }
 
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
-  DCHECK(false) << "Should not be called, PHJ uses the BuildSink API";
-  return Status::OK();
-}
-
 void PartitionedHashJoinNode::CloseAndDeletePartitions() {
   // Close all the partitions and clean up all references to them.
   for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 41493d0..73e0dd5 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -120,7 +120,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual Status QueryMaintenance(RuntimeState* state) override;
   virtual void AddToDebugString(
       int indentation_level, std::stringstream* out) const override;
-  virtual Status ProcessBuildInput(RuntimeState* state) override;
 
   // Safe to close the build side early because we rematerialize the build rows always.
   virtual bool CanCloseBuildEarly() const override { return true; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exprs/agg-fn-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc
index 860c78d..3a782be 100644
--- a/be/src/exprs/agg-fn-evaluator.cc
+++ b/be/src/exprs/agg-fn-evaluator.cc
@@ -21,7 +21,6 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/logging.h"
-#include "exec/aggregation-node.h"
 #include "exprs/aggregate-functions.h"
 #include "exprs/anyval-util.h"
 #include "exprs/scalar-expr.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index fbe1b94..11cf363 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -34,10 +34,6 @@
 
 #include "common/names.h"
 
-// Used to determine memory ownership of a RowBatch's tuple pointers.
-DECLARE_bool(enable_partitioned_hash_join);
-DECLARE_bool(enable_partitioned_aggregation);
-
 namespace impala {
 
 const int RowBatch::AT_CAPACITY_MEM_USAGE;
@@ -58,13 +54,9 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_
   tuple_ptrs_size_ = capacity * num_tuples_per_row_ * sizeof(Tuple*);
   DCHECK_GT(tuple_ptrs_size_, 0);
   // TODO: switch to Init() pattern so we can check memory limit and return Status.
-  if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
-    mem_tracker_->Consume(tuple_ptrs_size_);
-    tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
-    DCHECK(tuple_ptrs_ != NULL);
-  } else {
-    tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
-  }
+  mem_tracker_->Consume(tuple_ptrs_size_);
+  tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
+  DCHECK(tuple_ptrs_ != NULL);
 }
 
 // TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
@@ -89,13 +81,9 @@ RowBatch::RowBatch(
   DCHECK_EQ(input_batch.row_tuples.size(), row_desc->tuple_descriptors().size());
   DCHECK_GT(tuple_ptrs_size_, 0);
   // TODO: switch to Init() pattern so we can check memory limit and return Status.
-  if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
-    mem_tracker_->Consume(tuple_ptrs_size_);
-    tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
-    DCHECK(tuple_ptrs_ != NULL);
-  } else {
-    tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
-  }
+  mem_tracker_->Consume(tuple_ptrs_size_);
+  tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
+  DCHECK(tuple_ptrs_ != NULL);
   uint8_t* tuple_data;
   if (input_batch.compression_type != THdfsCompression::NONE) {
     DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type)
@@ -166,12 +154,10 @@ RowBatch::~RowBatch() {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
   }
-  if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
-    DCHECK(tuple_ptrs_ != NULL);
-    free(tuple_ptrs_);
-    mem_tracker_->Release(tuple_ptrs_size_);
-    tuple_ptrs_ = NULL;
-  }
+  DCHECK(tuple_ptrs_ != NULL);
+  free(tuple_ptrs_);
+  mem_tracker_->Release(tuple_ptrs_size_);
+  tuple_ptrs_ = NULL;
 }
 
 Status RowBatch::Serialize(TRowBatch* output_batch) {
@@ -346,9 +332,6 @@ void RowBatch::Reset() {
   }
   buffers_.clear();
   auxiliary_mem_usage_ = 0;
-  if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
-    tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
-  }
   flush_ = FlushMode::NO_FLUSH_RESOURCES;
   needs_deep_copy_ = false;
 }
@@ -373,10 +356,6 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   } else if (flush_ == FlushMode::FLUSH_RESOURCES) {
     dest->MarkFlushResources();
   }
-  if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
-    // Tuple pointers were allocated from tuple_data_pool_ so are transferred.
-    tuple_ptrs_ = NULL;
-  }
   Reset();
 }
 
@@ -399,14 +378,8 @@ void RowBatch::AcquireState(RowBatch* src) {
 
   num_rows_ = src->num_rows_;
   capacity_ = src->capacity_;
-  if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
-    // Tuple pointers are allocated from tuple_data_pool_ so are transferred.
-    tuple_ptrs_ = src->tuple_ptrs_;
-    src->tuple_ptrs_ = NULL;
-  } else {
-    // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
-    std::swap(tuple_ptrs_, src->tuple_ptrs_);
-  }
+  // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
+  std::swap(tuple_ptrs_, src->tuple_ptrs_);
   src->TransferResourceOwnership(this);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3cf1093..1b75ebb 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -398,19 +398,10 @@ class RowBatch {
   /// Array of pointers with InitialCapacity() * num_tuples_per_row_ elements.
   /// The memory ownership depends on whether legacy joins and aggs are enabled.
   ///
-  /// Memory is malloc'd and owned by RowBatch:
-  /// If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true
-  /// then the memory is owned by this RowBatch and is freed upon its destruction.
-  /// This mode is more performant especially with SubplanNodes in the ExecNode tree
-  /// because the tuple pointers are not transferred and do not have to be re-created
-  /// in every Reset().
-  ///
-  /// Memory is allocated from MemPool:
-  /// Otherwise, the memory is allocated from tuple_data_pool_. As a result, the
-  /// pointer memory is transferred just like tuple data, and must be re-created
-  /// in Reset(). This mode is required for the legacy join and agg which rely on
-  /// the tuple pointers being allocated from the tuple_data_pool_, so they can
-  /// acquire ownership of the tuple pointers.
+  /// Memory is malloc'd and owned by RowBatch and is freed upon its destruction. This is
+  /// more performant that allocating the pointers from 'tuple_data_pool_' especially
+  /// with SubplanNodes in the ExecNode tree because the tuple pointers are not
+  /// transferred and do not have to be re-created in every Reset().
   int tuple_ptrs_size_;
   Tuple** tuple_ptrs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 5523735..598ba08 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -27,7 +27,6 @@
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
-DECLARE_bool(enable_partitioned_hash_join);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
@@ -73,7 +72,6 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_lineage_event_log_dir(FLAGS_lineage_event_log_dir);
   cfg.__set_local_library_path(FLAGS_local_library_dir);
   cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms);
-  cfg.__set_enable_partitioned_hash_join(FLAGS_enable_partitioned_hash_join);
   cfg.__set_sentry_catalog_polling_frequency_s(FLAGS_sentry_catalog_polling_frequency_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index b5f6838..45c9133 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -55,9 +55,7 @@ struct TBackendGflags {
 
   17: required i32 initial_hms_cnxn_timeout_s
 
-  18: required bool enable_partitioned_hash_join
+  18: required bool enable_stats_extrapolation
 
-  19: required bool enable_stats_extrapolation
-
-  20: required i64 sentry_catalog_polling_frequency_s
+  19: required i64 sentry_catalog_polling_frequency_s
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/fe/src/main/java/org/apache/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 04dc40a..030d706 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -664,8 +664,7 @@ public abstract class JoinNode extends PlanNode {
         buildSideProfile.postOpenProfile.sum(nodeResourceProfile_));
 
     ResourceProfile finishedBuildProfile = nodeResourceProfile_;
-    if (this instanceof NestedLoopJoinNode
-        || !BackendConfig.INSTANCE.isPartitionedHashJoinEnabled()) {
+    if (this instanceof NestedLoopJoinNode) {
       // These exec node implementations may hold references into the build side, which
       // prevents closing of the build side in a timely manner. This means we have to
       // count the post-open resource consumption of the build side in the same way as

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 5b641e2..28e5fa3 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -64,10 +64,6 @@ public class BackendConfig {
     return backendCfg_.sentry_catalog_polling_frequency_s;
   }
 
-  public boolean isPartitionedHashJoinEnabled() {
-    return backendCfg_.enable_partitioned_hash_join;
-  }
-
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test
deleted file mode 100644
index 5658f8b..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/legacy-joins-aggs.test
+++ /dev/null
@@ -1,45 +0,0 @@
-====
----- QUERY
-# Query is allowed because it only references scalars.
-select * from complextypestbl
----- RESULTS
-1
-2
-3
-4
-5
-6
-7
-8
----- TYPES
-bigint
-====
----- QUERY
-# Query is allowed because it executes without a subplan.
-select * from complextypestbl.int_array
----- RESULTS
--1
-1
-1
-2
-2
-3
-3
-NULL
-NULL
-NULL
----- TYPES
-int
-====
----- QUERY
-# Query is not supported because it requires a subplan.
-select item from complextypestbl t, t.int_array
----- CATCH
-Query referencing nested types is not supported because the --enable_partitioned_hash_join and/or --enable_partitioned_aggregation Impala Daemon start-up flags are set to false.
-====
----- QUERY
-# Query is not supported because it requires a subplan.
-select key, value from complextypestbl t, t.int_map
----- CATCH
-Query referencing nested types is not supported because the --enable_partitioned_hash_join and/or --enable_partitioned_aggregation Impala Daemon start-up flags are set to false.
-====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/common/environ.py
----------------------------------------------------------------------
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 891bb02..571dc65 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -36,13 +36,7 @@ except ImportError as e:
 LOG = logging.getLogger('tests.common.environ')
 
 
-# See if Impala is running with legacy aggregations and/or hash joins. This is kind of a
-# hack. It would be better to poll Impala whether it is doing so.
 test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "")
-old_agg_regex = "enable_partitioned_aggregation=false"
-old_hash_join_regex = "enable_partitioned_hash_join=false"
-USING_OLD_AGGS_JOINS = re.search(old_agg_regex, test_start_cluster_args) is not None or \
-    re.search(old_hash_join_regex, test_start_cluster_args) is not None
 
 # Find the likely BuildType of the running Impala. Assume it's found through the path
 # $IMPALA_HOME/be/build/latest as a fallback.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index a34d2bc..da3dfe4 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -24,7 +24,7 @@ import os
 import pytest
 from functools import partial
 
-from tests.common.environ import IMPALAD_BUILD, USING_OLD_AGGS_JOINS
+from tests.common.environ import IMPALAD_BUILD
 from tests.util.filesystem_utils import (
     IS_ISILON,
     IS_LOCAL,
@@ -99,16 +99,6 @@ class SkipIfIsilon:
       reason="This Isilon issue has yet to be triaged.")
   jira = partial(pytest.mark.skipif, IS_ISILON)
 
-class SkipIfOldAggsJoins:
-  nested_types = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
-      reason="Nested types not supported with old aggs and joins")
-  passthrough_preagg = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
-      reason="Passthrough optimization not implemented by old agg")
-  unsupported = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
-      reason="Query unsupported with old aggs and joins")
-  requires_spilling = pytest.mark.skipif(USING_OLD_AGGS_JOINS,
-      reason="Test case requires spilling to pass")
-
 class SkipIfLocal:
   # These ones are skipped due to product limitations.
   caching = pytest.mark.skipif(IS_LOCAL,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/custom_cluster/test_legacy_joins_aggs.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_legacy_joins_aggs.py b/tests/custom_cluster/test_legacy_joins_aggs.py
deleted file mode 100644
index 24879d3..0000000
--- a/tests/custom_cluster/test_legacy_joins_aggs.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-
-class TestLegacyJoinsAggs(CustomClusterTestSuite):
-  """Tests the behavior of the legacy join and agg nodes with nested types."""
-
-  @classmethod
-  def get_workload(self):
-    return 'functional-query'
-
-  @CustomClusterTestSuite.with_args(
-      impalad_args=('--enable_partitioned_hash_join=false '
-                    '--enable_partitioned_aggregation=false'),
-      catalogd_args='--load_catalog_in_background=false')
-  def test_nested_types(self, vector):
-    self.run_test_case('QueryTest/legacy-joins-aggs', vector,
-      use_db='functional_parquet')

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index d69f603..feccdc5 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -22,7 +22,7 @@ import time
 from test_ddl_base import TestDdlBase
 from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
-from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal, SkipIfOldAggsJoins
+from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3, IS_ADLS
 
@@ -196,7 +196,6 @@ class TestDdlStatements(TestDdlBase):
   # supported if old joins and aggs are enabled. Since we do not get any meaningful
   # additional coverage by running a DDL test under the old aggs and joins, it can be
   # skipped.
-  @SkipIfOldAggsJoins.nested_types
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_table(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_aggregation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py
index 4999afe..9e0be6d 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -20,9 +20,7 @@
 import pytest
 
 from testdata.common import widetable
-from tests.common.environ import USING_OLD_AGGS_JOINS
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
     create_uncompressed_text_dimension)
@@ -122,8 +120,6 @@ class TestAggregation(ImpalaTestSuite):
   def test_aggregation(self, vector):
     exec_option = vector.get_value('exec_option')
     disable_codegen = exec_option['disable_codegen']
-    # The old aggregation node does not support codegen for all aggregate functions.
-    check_codegen_enabled = not disable_codegen and not USING_OLD_AGGS_JOINS
     data_type, agg_func = (vector.get_value('data_type'), vector.get_value('agg_func'))
 
     query = 'select %s(%s_col) from alltypesagg where day is not null' % (agg_func,
@@ -133,7 +129,7 @@ class TestAggregation(ImpalaTestSuite):
     assert len(result.data) == 1
     self.verify_agg_result(agg_func, data_type, False, result.data[0]);
 
-    if check_codegen_enabled:
+    if not disable_codegen:
       # Verify codegen was enabled for the preaggregation.
       # It is deliberately disabled for the merge aggregation.
       assert_codegen_enabled(result.runtime_profile, [1])
@@ -144,7 +140,7 @@ class TestAggregation(ImpalaTestSuite):
     assert len(result.data) == 1
     self.verify_agg_result(agg_func, data_type, True, result.data[0]);
 
-    if check_codegen_enabled:
+    if not disable_codegen:
       # Verify codegen was enabled for all stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
 
@@ -234,8 +230,7 @@ class TestAggregationQueries(ImpalaTestSuite):
       assert(set(row[i].split(delimiter[i-1])) == set(['1', '2', '3', '4']))
     assert(row[4] == '40')
     assert(row[5] == '4')
-    check_codegen_enabled = not disable_codegen and not USING_OLD_AGGS_JOINS
-    if check_codegen_enabled:
+    if not disable_codegen:
       # Verify codegen was enabled for all three stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4])
 
@@ -267,7 +262,7 @@ class TestAggregationQueries(ImpalaTestSuite):
     where int_col < 10"""
     result = self.execute_query(query, exec_option, table_format=table_format)
     assert(set((result.data)[0].split(" ")) == set(['1','2','3','4','5','6','7','8','9']))
-    if check_codegen_enabled:
+    if not disable_codegen:
       # Verify codegen was enabled for all four stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
 
@@ -333,6 +328,5 @@ class TestTPCHAggregationQueries(ImpalaTestSuite):
   def test_tpch_aggregations(self, vector):
     self.run_test_case('tpch-aggregations', vector)
 
-  @SkipIfOldAggsJoins.passthrough_preagg
   def test_tpch_passthrough_aggregations(self, vector):
     self.run_test_case('tpch-passthrough-aggregations', vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_join_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py
index b333a71..a8f2be0 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -25,7 +25,6 @@ from tests.common.skip import (
     SkipIf,
     SkipIfIsilon,
     SkipIfLocal,
-    SkipIfOldAggsJoins,
     SkipIfS3,
     SkipIfADLS)
 from tests.common.test_vector import ImpalaTestDimension
@@ -62,7 +61,6 @@ class TestJoinQueries(ImpalaTestSuite):
     new_vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector)
 
-  @SkipIfOldAggsJoins.unsupported
   def test_partitioned_joins(self, vector):
     self.run_test_case('QueryTest/joins-partitioned', vector)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 1277002..38a8314 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -22,7 +22,6 @@ import pytest
 from copy import deepcopy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.kudu_test_suite import KuduTestSuite
-from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_vector import ImpalaTestDimension
 
 # COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include
@@ -94,7 +93,6 @@ class TestMtDopParquet(ImpalaTestSuite):
 
   @pytest.mark.xfail(pytest.config.option.testing_remote_cluster,
                      reason='IMPALA-4641')
-  @SkipIfOldAggsJoins.nested_types
   def test_parquet_nested(self, vector):
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/mt-dop-parquet-nested', vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_nested_types.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 5635898..96a170b 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -23,7 +23,6 @@ from subprocess import check_call
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (
-    SkipIfOldAggsJoins,
     SkipIfIsilon,
     SkipIfS3,
     SkipIfADLS,
@@ -31,7 +30,6 @@ from tests.common.skip import (
 
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
 
-@SkipIfOldAggsJoins.nested_types
 class TestNestedTypes(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -88,7 +86,6 @@ class TestNestedTypes(ImpalaTestSuite):
     vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/nested-types-parquet-stats', vector)
 
-@SkipIfOldAggsJoins.nested_types
 class TestParquetArrayEncodings(ImpalaTestSuite):
   TESTFILE_DIR = os.path.join(os.environ['IMPALA_HOME'],
                               "testdata/parquet_nested_types_encodings")
@@ -534,7 +531,6 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
     local_path = self.TESTFILE_DIR + "/" + filename
     check_call(["hadoop", "fs", "-put", local_path, location], shell=False)
 
-@SkipIfOldAggsJoins.nested_types
 class TestMaxNestingDepth(ImpalaTestSuite):
   # Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH
   MAX_NESTING_DEPTH = 100

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_runtime_filters.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 6edfd82..7710f43 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -20,7 +20,7 @@ import pytest
 import time
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfLocal, SkipIfOldAggsJoins
+from tests.common.skip import SkipIfLocal
 
 @SkipIfLocal.multiple_impalad
 class TestRuntimeFilters(ImpalaTestSuite):
@@ -63,7 +63,5 @@ class TestRuntimeRowFilters(ImpalaTestSuite):
   def test_row_filters(self, vector):
     self.run_test_case('QueryTest/runtime_row_filters', vector)
 
-  @SkipIfOldAggsJoins.requires_spilling
-  @SkipIfOldAggsJoins.nested_types
   def test_row_filters_phj_only(self, vector):
     self.run_test_case('QueryTest/runtime_row_filters_phj', vector)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 74a69ac..e9fd457 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -36,7 +36,6 @@ from tests.common.skip import (
     SkipIfS3,
     SkipIfADLS,
     SkipIfIsilon,
-    SkipIfOldAggsJoins,
     SkipIfLocal)
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_result_verifier import (
@@ -251,7 +250,6 @@ class TestParquet(ImpalaTestSuite):
   def test_parquet(self, vector):
     self.run_test_case('QueryTest/parquet', vector)
 
-  @SkipIfOldAggsJoins.nested_types
   def test_corrupt_files(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-continue-on-error', vector)
@@ -536,7 +534,6 @@ class TestParquet(ImpalaTestSuite):
     assert c_schema_elt.converted_type == ConvertedType.UTF8
     assert d_schema_elt.converted_type == None
 
-  @SkipIfOldAggsJoins.nested_types
   def test_resolution_by_name(self, vector, unique_database):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
                        use_db=unique_database)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/tests/query_test/test_tpch_nested_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_tpch_nested_queries.py b/tests/query_test/test_tpch_nested_queries.py
index 7a78cbe..28e6ac3 100644
--- a/tests/query_test/test_tpch_nested_queries.py
+++ b/tests/query_test/test_tpch_nested_queries.py
@@ -18,10 +18,8 @@
 # Functional tests running the TPCH workload.
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfOldAggsJoins
 from tests.common.test_dimensions import create_single_exec_option_dimension
 
-@SkipIfOldAggsJoins.nested_types
 class TestTpchNestedQuery(ImpalaTestSuite):
   @classmethod
   def get_workload(self):


[6/6] incubator-impala git commit: IMPALA-5658: addtl. process/system-wide memory metrics

Posted by ta...@apache.org.
IMPALA-5658: addtl. process/system-wide memory metrics

This is meant to help understand how the process is using memory, to
answer questions like:
* Is the Impala daemon using transparent huge pages?
* What is the system THP configuration?
* What is the RSS of the process?
* What is the virtual memory size of the process?

Most of these questions can be answered via other command line or
monitoring, except for the THP usage by the process, but adding
metrics helps consolidate the information in one place.

This commit adds a memory maintenance thread that periodically scrapes
information from the /proc and /sys filesystems and updates the
metrics.

The interfaces used are:
* /proc/<pid>/smaps, which is a documented interface to get detailed
  information about mapped memory:
  https://www.kernel.org/doc/Documentation/filesystems/proc.txt
* /sys/kernel/mm/ config values for transparent huge pages

Change-Id: I13873e305ba464d11dea0d7244a29ff4f332f1a9
Reviewed-on: http://gerrit.cloudera.org:8080/7472
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3f82d157
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3f82d157
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3f82d157

Branch: refs/heads/master
Commit: 3f82d15704160f31d4060123520c3fb3493a3533
Parents: 507bd8b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jul 19 14:44:00 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 2 04:06:57 2017 +0000

----------------------------------------------------------------------
 be/src/common/init.cc                |  30 +++++----
 be/src/runtime/exec-env.cc           |   2 +-
 be/src/util/default-path-handlers.cc |  11 ++++
 be/src/util/mem-info.cc              | 102 ++++++++++++++++++++++++++++--
 be/src/util/mem-info.h               |  48 +++++++++++++-
 be/src/util/memory-metrics.cc        |  75 +++++++++++++++++-----
 be/src/util/memory-metrics.h         |  35 +++++++++-
 common/thrift/metrics.json           |  84 ++++++++++++++++++++++++
 www/memz.tmpl                        |  35 ++++++++++
 9 files changed, 384 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index f05a995..797df39 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -128,21 +128,27 @@ static scoped_ptr<impala::Thread> pause_monitor;
   while (true) {
     SleepForMs(FLAGS_memory_maintenance_sleep_time_ms);
     impala::ExecEnv* env = impala::ExecEnv::GetInstance();
-    if (env == nullptr) continue; // ExecEnv may not have been created yet.
-    BufferPool* buffer_pool = env->buffer_pool();
-    if (buffer_pool != nullptr) buffer_pool->Maintenance();
+    // ExecEnv may not have been created yet or this may be the catalogd or statestored,
+    // which don't have ExecEnvs.
+    if (env != nullptr) {
+      BufferPool* buffer_pool = env->buffer_pool();
+      if (buffer_pool != nullptr) buffer_pool->Maintenance();
 
 #ifndef ADDRESS_SANITIZER
-    // When using tcmalloc, the process limit as measured by our trackers will
-    // be out of sync with the process usage. The metric is refreshed whenever
-    // memory is consumed or released via a MemTracker, so on a system with
-    // queries executing it will be refreshed frequently. However if the system
-    // is idle, we need to refresh the tracker occasionally since untracked
-    // memory may be allocated or freed, e.g. by background threads.
-    if (env != NULL && env->process_mem_tracker() != NULL) {
-      env->process_mem_tracker()->RefreshConsumptionFromMetric();
-    }
+      // When using tcmalloc, the process limit as measured by our trackers will
+      // be out of sync with the process usage. The metric is refreshed whenever
+      // memory is consumed or released via a MemTracker, so on a system with
+      // queries executing it will be refreshed frequently. However if the system
+      // is idle, we need to refresh the tracker occasionally since untracked
+      // memory may be allocated or freed, e.g. by background threads.
+      if (env->process_mem_tracker() != nullptr) {
+        env->process_mem_tracker()->RefreshConsumptionFromMetric();
+      }
 #endif
+    }
+    // Periodically refresh values of the aggregate memory metrics to ensure they are
+    // somewhat up-to-date.
+    AggregateMemoryMetrics::Refresh();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 6d5fb7a..960e3c9 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -241,7 +241,7 @@ Status ExecEnv::StartServices() {
 
   // Limit of -1 means no memory limit.
   mem_tracker_.reset(new MemTracker(
-      AggregateMemoryMetric::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process"));
+      AggregateMemoryMetrics::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process"));
   if (buffer_pool_ != nullptr) {
     // Add BufferPool MemTrackers for cached memory that is not tracked against queries
     // but is included in process memory consumption.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 508ea45..17d2c94 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -31,6 +31,7 @@
 #include "service/impala-server.h"
 #include "util/common-metrics.h"
 #include "util/debug-util.h"
+#include "util/mem-info.h"
 #include "util/pprof-path-handlers.h"
 #include "util/mem-info.h"
 #include "util/cpu-info.h"
@@ -149,7 +150,17 @@ void MemUsageHandler(MemTracker* mem_tracker, MetricGroup* metric_group,
   Value detailed(mem_tracker->LogUsage().c_str(), document->GetAllocator());
   document->AddMember("detailed", detailed, document->GetAllocator());
 
+  Value systeminfo(MemInfo::DebugString().c_str(), document->GetAllocator());
+  document->AddMember("systeminfo", systeminfo, document->GetAllocator());
+
   if (metric_group != nullptr) {
+    MetricGroup* aggregate_group = metric_group->FindChildGroup("memory");
+    if (aggregate_group != nullptr) {
+      Value json_metrics(kObjectType);
+      aggregate_group->ToJson(false, document, &json_metrics);
+      document->AddMember(
+          "aggregate_metrics", json_metrics["metrics"], document->GetAllocator());
+    }
     MetricGroup* jvm_group = metric_group->FindChildGroup("jvm");
     if (jvm_group != nullptr) {
       Value jvm(kObjectType);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/util/mem-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/mem-info.cc b/be/src/util/mem-info.cc
index 89144b2..4d551f1 100644
--- a/be/src/util/mem-info.cc
+++ b/be/src/util/mem-info.cc
@@ -19,14 +19,16 @@
 #include "util/debug-util.h"
 #include "util/string-parser.h"
 
-#include <boost/algorithm/string.hpp>
-#include <boost/lexical_cast.hpp>
-#include <iostream>
-#include <fstream>
-#include <sstream>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <cctype>
+#include <fstream>
+#include <iostream>
+#include <sstream>
+
+#include <boost/algorithm/string.hpp>
+#include <boost/lexical_cast.hpp>
 
 #include "util/pretty-printer.h"
 
@@ -34,6 +36,7 @@
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::split;
+using boost::algorithm::trim;
 using boost::algorithm::token_compress_on;
 
 namespace impala {
@@ -96,13 +99,98 @@ void MemInfo::ParseOvercommit() {
   overcommit_s >> vm_overcommit_;
 }
 
+bool MemInfo::HaveSmaps() {
+  MappedMemInfo result;
+  ifstream smaps("/proc/self/smaps", ios::in);
+  return smaps.good();
+}
+
+MappedMemInfo MemInfo::ParseSmaps() {
+  MappedMemInfo result;
+  ifstream smaps("/proc/self/smaps", ios::in);
+  if (!smaps) {
+    LOG_FIRST_N(INFO, 1) << "Could not open smaps";
+    return result;
+  }
+  while (smaps) {
+    string line;
+    getline(smaps, line);
+    if (line.empty()) continue;
+    if (isdigit(line[0])) {
+      // Line is the start of a new mapping, of form:
+      // 561ceff9c000-561ceffa1000 rw-p 00000000 00:00 0
+      ++result.num_maps;
+      continue;
+    }
+    // Line is in the form of <Name>: <value>, e.g.:
+    // Size: 1084 kB
+    // VmFlags: rd ex mr mw me dw
+    size_t colon_pos = line.find(':');
+    if (colon_pos == string::npos) continue;
+    string name = line.substr(0, colon_pos);
+    string value = line.substr(colon_pos + 1, string::npos);
+    trim(value);
+
+    // Use atol() to parse the value, ignoring " kB" suffix.
+    if (name == "Size") {
+      result.size_kb += atol(value.c_str());
+    } else if (name == "Rss") {
+      result.rss_kb += atol(value.c_str());
+    } else if (name == "AnonHugePages") {
+      result.anon_huge_pages_kb += atol(value.c_str());
+    }
+  }
+  return result;
+}
+
+ThpConfig MemInfo::ParseThpConfig() {
+  ThpConfig result;
+  result.enabled = GetThpConfigVal("enabled");
+  result.defrag = GetThpConfigVal("defrag");
+  result.khugepaged_defrag = GetThpConfigVal("khugepaged/defrag");
+  return result;
+}
+
+string MemInfo::GetThpConfigVal(const string& relative_path) {
+  // This is the standard location for the configs.
+  ifstream file("/sys/kernel/mm/transparent_hugepage/" + relative_path);
+  if (!file) {
+    // Some earlier versions of CentOS/RHEL put the configs in a different place.
+    file.open("/sys/kernel/mm/redhat_transparent_hugepage/" + relative_path);
+    if (!file) {
+      LOG_FIRST_N(INFO, 1) << "Could not open thp config: " << relative_path;
+      return "<unknown>";
+    }
+  }
+  string result;
+  getline(file, result);
+  return result;
+}
+
 string MemInfo::DebugString() {
   DCHECK(initialized_);
   stringstream stream;
-  stream << "Physical Memory: "
-         << PrettyPrinter::Print(physical_mem_, TUnit::BYTES)
+  stream << "Physical Memory: " << PrettyPrinter::Print(physical_mem_, TUnit::BYTES)
          << endl;
+  stream << ParseThpConfig().DebugString();
+  return stream.str();
+}
+
+string MappedMemInfo::DebugString() const {
+  stringstream stream;
+  stream << "Number of mappings: " << num_maps << endl;
+  stream << "Total mapping (kB): " << size_kb << endl;
+  stream << "RSS (kB): " << rss_kb << endl;
+  stream << "Anon huge pages (kB): " << anon_huge_pages_kb << endl;
   return stream.str();
 }
 
+string ThpConfig::DebugString() const {
+  stringstream stream;
+  stream << "Transparent Huge Pages Config:" << endl;
+  stream << "  enabled: " << enabled << endl;
+  stream << "  defrag: " << defrag << endl;
+  stream << "  khugepaged defrag: " << khugepaged_defrag << endl;
+  return stream.str();
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/util/mem-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/mem-info.h b/be/src/util/mem-info.h
index f7ef844..2e32d94 100644
--- a/be/src/util/mem-info.h
+++ b/be/src/util/mem-info.h
@@ -25,6 +25,37 @@
 
 namespace impala {
 
+/// Information obtained from /proc/<pid>/smaps.
+struct MappedMemInfo {
+  // Number of memory maps.
+  int64_t num_maps = 0;
+
+  // Total size of memory maps (i.e. virtual memory size) in kilobytes.
+  int64_t size_kb = 0;
+
+  // RSS in kilobytes
+  int64_t rss_kb = 0;
+
+  // Kilobytes of anonymous huge pages.
+  int64_t anon_huge_pages_kb = 0;
+
+  std::string DebugString() const;
+};
+
+/// Information about the system transparent huge pages config.
+struct ThpConfig {
+  // Whether THP is enabled. Just contains the raw string, e.g. "[always] madvise never".
+  std::string enabled;
+
+  // Whether synchronous THP defrag is enabled, e.g. "[always] madvise never".
+  std::string defrag;
+
+  // Whether THP defrag via khugepaged is enabled. Usually "0"/"1".
+  std::string khugepaged_defrag;
+
+  std::string DebugString() const;
+};
+
 /// Provides the amount of physical memory available.
 /// Populated from /proc/meminfo.
 /// TODO: Allow retrieving of cgroup memory limits,
@@ -54,12 +85,27 @@ class MemInfo {
     return commit_limit_;
   }
 
+  /// Return true if the /proc/<pid>/smaps file is present and can be opened.
+  static bool HaveSmaps();
+
+  /// Parse /proc/<pid>/smaps for this process and extract relevant information.
+  /// Logs a warning if the file could not be opened or had an unexpected format.
+  static MappedMemInfo ParseSmaps();
+
+  /// Parse the transparent huge pages configs.
+  /// Logs a warning if a file could not be opened or had an unexpected format.
+  static ThpConfig ParseThpConfig();
+
   static std::string DebugString();
 
  private:
-
   static void ParseOvercommit();
 
+  /// Get the config value from a file, trying the path relative to both
+  /// /sys/kernel/mm/transparent_hugepage and /sys/kernel/mm/redhat_transparent_hugepage.
+  /// Assumes the file has a single line only.
+  static std::string GetThpConfigVal(const std::string& relative_path);
+
   static bool initialized_;
   static int64_t physical_mem_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index b7aa3a6..f2ddcb9 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -23,6 +23,7 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "util/jni-util.h"
+#include "util/mem-info.h"
 #include "util/time.h"
 
 using boost::algorithm::to_lower;
@@ -31,13 +32,20 @@ using namespace strings;
 
 DECLARE_bool(mmap_buffers);
 
-SumGauge<uint64_t>* AggregateMemoryMetric::TOTAL_USED = nullptr;
+SumGauge<uint64_t>* AggregateMemoryMetrics::TOTAL_USED = nullptr;
+UIntGauge* AggregateMemoryMetrics::NUM_MAPS = nullptr;
+UIntGauge* AggregateMemoryMetrics::MAPPED_BYTES = nullptr;
+UIntGauge* AggregateMemoryMetrics::RSS = nullptr;
+UIntGauge* AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES = nullptr;
+StringProperty* AggregateMemoryMetrics::THP_ENABLED = nullptr;
+StringProperty* AggregateMemoryMetrics::THP_DEFRAG = nullptr;
+StringProperty* AggregateMemoryMetrics::THP_KHUGEPAGED_DEFRAG = nullptr;
 
-TcmallocMetric* TcmallocMetric::BYTES_IN_USE = NULL;
-TcmallocMetric* TcmallocMetric::PAGEHEAP_FREE_BYTES = NULL;
-TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = NULL;
-TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = NULL;
-TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = NULL;
+TcmallocMetric* TcmallocMetric::BYTES_IN_USE = nullptr;
+TcmallocMetric* TcmallocMetric::PAGEHEAP_FREE_BYTES = nullptr;
+TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = nullptr;
+TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = nullptr;
+TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = nullptr;
 
 AsanMallocMetric* AsanMallocMetric::BYTES_ALLOCATED = nullptr;
 
@@ -76,34 +84,71 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
       new AsanMallocMetric(MetricDefs::Get("asan-total-bytes-allocated")));
   used_metrics.push_back(AsanMallocMetric::BYTES_ALLOCATED);
 #else
+  MetricGroup* tcmalloc_metrics = metrics->GetOrCreateChildGroup("tcmalloc");
   // We rely on TCMalloc for our global memory metrics, so skip setting them up
   // if we're not using TCMalloc.
   TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(
-      metrics, "tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
+      tcmalloc_metrics, "tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
 
   TcmallocMetric::TOTAL_BYTES_RESERVED = TcmallocMetric::CreateAndRegister(
-      metrics, "tcmalloc.total-bytes-reserved", "generic.heap_size");
+      tcmalloc_metrics, "tcmalloc.total-bytes-reserved", "generic.heap_size");
 
-  TcmallocMetric::PAGEHEAP_FREE_BYTES = TcmallocMetric::CreateAndRegister(metrics,
-      "tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes");
+  TcmallocMetric::PAGEHEAP_FREE_BYTES = TcmallocMetric::CreateAndRegister(
+      tcmalloc_metrics, "tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes");
 
-  TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = TcmallocMetric::CreateAndRegister(metrics,
-      "tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes");
+  TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES =
+      TcmallocMetric::CreateAndRegister(tcmalloc_metrics,
+          "tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes");
 
   TcmallocMetric::PHYSICAL_BYTES_RESERVED =
-      metrics->RegisterMetric(new TcmallocMetric::PhysicalBytesMetric(
+      tcmalloc_metrics->RegisterMetric(new TcmallocMetric::PhysicalBytesMetric(
           MetricDefs::Get("tcmalloc.physical-bytes-reserved")));
 
   used_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
 #endif
-  AggregateMemoryMetric::TOTAL_USED = metrics->RegisterMetric(
+  MetricGroup* aggregate_metrics = metrics->GetOrCreateChildGroup("memory");
+  AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
       new SumGauge<uint64_t>(MetricDefs::Get("memory.total-used"), used_metrics));
   if (register_jvm_metrics) {
     RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
   }
+
+  if (MemInfo::HaveSmaps()) {
+    AggregateMemoryMetrics::NUM_MAPS =
+        aggregate_metrics->AddGauge<uint64_t>("memory.num-maps", 0U);
+    AggregateMemoryMetrics::MAPPED_BYTES =
+        aggregate_metrics->AddGauge<uint64_t>("memory.mapped-bytes", 0U);
+    AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge<uint64_t>("memory.rss", 0U);
+    AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES =
+        aggregate_metrics->AddGauge<uint64_t>("memory.anon-huge-page-bytes", 0U);
+  }
+  ThpConfig thp_config = MemInfo::ParseThpConfig();
+  AggregateMemoryMetrics::THP_ENABLED =
+      aggregate_metrics->AddProperty("memory.thp.enabled", thp_config.enabled);
+  AggregateMemoryMetrics::THP_DEFRAG =
+      aggregate_metrics->AddProperty("memory.thp.defrag", thp_config.defrag);
+  AggregateMemoryMetrics::THP_KHUGEPAGED_DEFRAG = aggregate_metrics->AddProperty(
+      "memory.thp.khugepaged-defrag", thp_config.khugepaged_defrag);
+  AggregateMemoryMetrics::Refresh();
   return Status::OK();
 }
 
+void AggregateMemoryMetrics::Refresh() {
+  if (NUM_MAPS != nullptr) {
+    // Only call ParseSmaps() if the metrics were created.
+    MappedMemInfo map_info = MemInfo::ParseSmaps();
+    NUM_MAPS->set_value(map_info.num_maps);
+    MAPPED_BYTES->set_value(map_info.size_kb * 1024);
+    RSS->set_value(map_info.rss_kb * 1024);
+    ANON_HUGE_PAGE_BYTES->set_value(map_info.anon_huge_pages_kb * 1024);
+  }
+
+  ThpConfig thp_config = MemInfo::ParseThpConfig();
+  THP_ENABLED->set_value(thp_config.enabled);
+  THP_DEFRAG->set_value(thp_config.defrag);
+  THP_KHUGEPAGED_DEFRAG->set_value(thp_config.khugepaged_defrag);
+}
+
 JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
     const string& pool_name, JvmMetric::JvmMetricType type) {
   string pool_name_for_key = pool_name;
@@ -120,7 +165,7 @@ JvmMetric::JvmMetric(const TMetricDef& def, const string& mempool_name,
 }
 
 Status JvmMetric::InitMetrics(MetricGroup* metrics) {
-  DCHECK(metrics != NULL);
+  DCHECK(metrics != nullptr);
   TGetJvmMetricsRequest request;
   request.get_all = true;
   TGetJvmMetricsResponse response;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 5149d9c..ffc3d20 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -37,14 +37,45 @@ class ReservationTracker;
 class Thread;
 
 /// Memory metrics including TCMalloc and BufferPool memory.
-class AggregateMemoryMetric {
+class AggregateMemoryMetrics {
  public:
   /// The sum of Tcmalloc TOTAL_BYTES_RESERVED and BufferPool SYSTEM_ALLOCATED.
   /// Approximates the total amount of physical memory consumed by the backend (i.e. not
   /// including JVM memory), which is either in use by queries or cached by the BufferPool
-  /// or TcMalloc. NULL when running under ASAN.
+  /// or the malloc implementation.
   /// TODO: IMPALA-691 - consider changing this to include JVM memory.
   static SumGauge<uint64_t>* TOTAL_USED;
+
+  /// The total number of virtual memory regions for the process.
+  /// The value must be refreshed by calling Refresh().
+  static UIntGauge* NUM_MAPS;
+
+  /// The total size of virtual memory regions for the process.
+  /// The value must be refreshed by calling Refresh().
+  static UIntGauge* MAPPED_BYTES;
+
+  /// The total RSS of all virtual memory regions for the process.
+  /// The value must be refreshed by calling Refresh().
+  static UIntGauge* RSS;
+
+  /// The total RSS of all virtual memory regions for the process.
+  /// The value must be refreshed by calling Refresh().
+  static UIntGauge* ANON_HUGE_PAGE_BYTES;
+
+  /// The string reporting the /enabled setting for transparent huge pages.
+  /// The value must be refreshed by calling Refresh().
+  static StringProperty* THP_ENABLED;
+
+  /// The string reporting the /defrag setting for transparent huge pages.
+  /// The value must be refreshed by calling Refresh().
+  static StringProperty* THP_DEFRAG;
+
+  /// The string reporting the khugepaged/defrag setting for transparent huge pages.
+  /// The value must be refreshed by calling Refresh().
+  static StringProperty* THP_KHUGEPAGED_DEFRAG;
+
+  /// Refreshes values of any of the aggregate metrics that require refreshing.
+  static void Refresh();
 };
 
 /// Specialised metric which exposes numeric properties from tcmalloc.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 251ea14..2fbb6fc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1208,6 +1208,90 @@
     "key": "memory.total-used"
   },
   {
+    "description": "Total number of memory mappings in this process.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Number of memory mappings.",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "memory.num-maps"
+  },
+  {
+    "description": "Total bytes of memory mappings in this process (the virtual memory size).",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Total Virtual Memory.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "memory.mapped-bytes"
+  },
+  {
+    "description": "Resident set size (RSS) of this process, including TCMalloc, buffer pool and Jvm.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Resident set size.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "memory.rss"
+  },
+  {
+    "description": "Total bytes of anonymous (a.k.a. transparent) huge pages used by this process.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Anonymous Huge Pages.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "memory.anon-huge-page-bytes"
+  },
+  {
+    "description": "The system-wide 'enabled' setting for Transparent Huge Pages.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Transparent Huge Pages Enabled.",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "memory.thp.enabled"
+  },
+  {
+    "description": "The system-wide 'defrag' setting for Transparent Huge Pages.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Transparent Huge Pages Defrag Enabled.",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "memory.thp.defrag"
+  },
+  {
+    "description": "The system-wide 'defrag' setting for khugepaged.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Khugepaged Defrag Enabled.",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "memory.thp.khugepaged-defrag"
+  },
+  {
     "description": "The number of running threads in this process.",
     "contexts": [
       "STATESTORE",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f82d157/www/memz.tmpl
----------------------------------------------------------------------
diff --git a/www/memz.tmpl b/www/memz.tmpl
index fca45c0..9c629d8 100644
--- a/www/memz.tmpl
+++ b/www/memz.tmpl
@@ -28,6 +28,41 @@ Memory consumption / limit: <strong>{{consumption}}</strong> / <strong>{{mem_lim
 <h3>tcmalloc</h3>
 <pre>{{overview}}</pre>
 
+<h3>System</h3>
+<pre>{{systeminfo}}</pre>
+
+{{?aggregate_metrics}}
+<h3>Process and system memory metrics</h3>
+<table class='table table-bordered table-hover'>
+  <tr>
+    <th>Name</th>
+    <th>Value</th>
+    <th>Description</th>
+  </tr>
+{{/aggregate_metrics}}
+{{#aggregate_metrics}}
+  <tr>
+    <td><tt>{{name}}</tt></td>
+    {{! Is this a stats metric? }}
+    {{?mean}}
+    <td>
+      Last (of {{count}}): <strong>{{last}}</strong>.
+      Min: {{min}}, max: {{max}}, avg: {{mean}}</td>
+    {{/mean}}
+    {{^mean}}
+    <td>
+      {{human_readable}}
+    </td>
+    {{/mean}}
+    <td>
+      {{description}}
+    </td>
+  </tr>
+{{/aggregate_metrics}}
+{{?aggregate_metrics}}
+</table>
+{{/aggregate_metrics}}
+
 {{?buffer_pool}}
 <h3>Buffer pool memory metrics</h3>
 <table class='table table-bordered table-hover'>