You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2020/02/28 23:03:21 UTC

[impala] 02/03: IMPALA-8755: Backend support for Z-ordering

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 60a9e72faf7b8c3e034f4319c0761ed389994707
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Fri Oct 18 15:23:42 2019 +0200

    IMPALA-8755: Backend support for Z-ordering
    
    This change depends on gerrit.cloudera.org/#/c/13955/
    (Frontend support for Z-ordering)
    
    The commit adds a Comparator based on Z-ordering. See in detail:
    https://en.wikipedia.org/wiki/Z-order_curve
    
    The comparator instead of calculating the Z-values of the rows,
    looks for the column with the most significant dimension, and
    compares the values of this column only. The most significant
    dimension will be the one where the compared values have the
    highest different bits. The algorithm requires values of
    the same binary representation, therefore the values are
    converted into either uint32_t, uint63_t or uint128_t, the
    smallest in which all data fits. Comparing smaller types with
    bigger ones would make the bigger type much more dominant
    therefore the bits of these smaller types are shifted up.
    
    All primitive types (including string and floating point types)
    are supported.
    
    Testing:
     * Added unit tests.
     * Run manual tests, comparing 4-column values with 4-bit
       integers, for all possible combinations. Checked the result by
       calculating the Z-value for each comparison.
     * Tested performance on various data, getting great results for
       selective queries. An example: used the TPCH dataset's
       lineitem table with scale 25, where the sorting columns are
       l_partkey and l_suppkey, in that order. Run selective queries
       for the value range of the two columns, for both lexical and
       Z-ordering and compared the percentage of filtered pages and
       row groups. While queries with filters on the first column
       showed almost no difference, queries on the second column
       is in favour of Z-ordering:
       Ordering | Column | Filtered pages % | Filtered row groups %
       Lex.       1st      ~99%               ~90%
       Z-ord.     1st      ~99%               ~89%
       Lex.       2nd      ~25%               0%
       Z-ord.     2nd      ~97%               0%
       The only drawback is the sorting itself, taking ~4 times more
       than lexical sorting (eg. sorting for the dataset above took
       14m for Lexical, and 55m for Z-ordering).
       Note however, that this is a one-time thing to do, sorting
       only happens once, when writing the data.
       Also, lexical ordering is supported by codegen, while it is
       not implemented for Z-ordering yet.
    
    Change-Id: I0200748ce3e65ebc5d3530f794c0f80aa335a2ab
    Reviewed-on: http://gerrit.cloudera.org:8080/14080
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/exchange-node.cc                       |   2 +-
 be/src/exec/hdfs-table-sink.cc                     |   1 +
 be/src/exec/hdfs-table-sink.h                      |   4 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  16 +-
 be/src/exec/partial-sort-node.cc                   |   6 +-
 be/src/exec/partial-sort-node.h                    |   2 +
 be/src/exec/sort-node.cc                           |   6 +-
 be/src/exec/sort-node.h                            |   2 +
 be/src/exec/topn-node.cc                           |   2 +-
 be/src/runtime/data-stream-test.cc                 |   2 +-
 be/src/runtime/sorter.cc                           |  31 +-
 be/src/runtime/sorter.h                            |   4 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/tuple-row-compare-test.cc              | 803 +++++++++++++++++++++
 be/src/util/tuple-row-compare.cc                   | 178 ++++-
 be/src/util/tuple-row-compare.h                    | 108 ++-
 .../java/org/apache/impala/analysis/TableDef.java  |  13 -
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  32 +-
 18 files changed, 1119 insertions(+), 95 deletions(-)

diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 5a59398..a3e0726 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -107,7 +107,7 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   if (is_merging_) {
     less_than_.reset(
-        new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+        new TupleRowLexicalComparator(ordering_exprs_, is_asc_order_, nulls_first_));
     state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   }
   return Status::OK();
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 2674e72..05a00ee 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -82,6 +82,7 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sin
     overwrite_(hdfs_sink.overwrite),
     input_is_clustered_(hdfs_sink.input_is_clustered),
     sort_columns_(hdfs_sink.sort_columns),
+    sorting_order_((TSortingOrder::type)hdfs_sink.sorting_order),
     current_clustered_partition_(nullptr),
     partition_key_exprs_(sink_config.partition_key_exprs_) {
   if (hdfs_sink.__isset.write_id) {
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2e790fd..0ea7009 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -173,6 +173,7 @@ class HdfsTableSink : public DataSink {
 
   int skip_header_line_count() const { return skip_header_line_count_; }
   const vector<int32_t>& sort_columns() const { return sort_columns_; }
+  TSortingOrder::type sorting_order() const { return sorting_order_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
   RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
@@ -291,6 +292,9 @@ class HdfsTableSink : public DataSink {
   // populate the RowGroup::sorting_columns list in parquet files.
   const std::vector<int32_t>& sort_columns_;
 
+  // Represents the sorting order used in SORT BY queries.
+  const TSortingOrder::type sorting_order_;
+
   /// Stores the current partition during clustered inserts across subsequent row batches.
   /// Only set if 'input_is_clustered_' is true.
   PartitionPair* current_clustered_partition_;
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index b6a4391..7225384 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1394,13 +1394,17 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   }
 
   // Populate RowGroup::sorting_columns with all columns specified by the Frontend.
-  for (int col_idx : parent_->sort_columns()) {
-    current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
-    parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
-    sorting_column.column_idx = col_idx;
-    sorting_column.descending = false;
-    sorting_column.nulls_first = false;
+  // Do that only, if the sorting type is lexical.
+  if (parent_->sorting_order() == TSortingOrder::LEXICAL){
+    for (int col_idx : parent_->sort_columns()) {
+      current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
+      parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
+      sorting_column.column_idx = col_idx;
+      sorting_column.descending = false;
+      sorting_column.nulls_first = false;
+    }
   }
+
   current_row_group_->__isset.sorting_columns =
       !current_row_group_->sorting_columns.empty();
 
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index f8cdf8b..1586f46 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -38,6 +38,8 @@ Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  sorting_order_ = static_cast<TSortingOrder::type>(
+      tnode.sort_node.sort_info.sorting_order);
   return Status::OK();
 }
 
@@ -58,6 +60,7 @@ PartialSortNode::PartialSortNode(
   sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
   is_asc_order_ = pnode.is_asc_order_;
   nulls_first_ = pnode.nulls_first_;
+  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Partial");
 }
 
@@ -70,7 +73,8 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false,
+      sorting_order_));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index 12d460a..9cbe988 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -39,6 +39,7 @@ class PartialSortPlanNode : public PlanNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 };
 
 /// Node that implements a partial sort, where its input is divided up into runs, each
@@ -85,6 +86,7 @@ class PartialSortNode : public ExecNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 2efe40e..5943fc0 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -37,6 +37,8 @@ Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  sorting_order_ = static_cast<TSortingOrder::type>(
+      tnode.sort_node.sort_info.sorting_order);
   return Status::OK();
 }
 
@@ -56,6 +58,7 @@ SortNode::SortNode(
   sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
   is_asc_order_ = pnode.is_asc_order_;
   nulls_first_ = pnode.nulls_first_;
+  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Total");
 }
 
@@ -67,7 +70,8 @@ Status SortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true,
+      sorting_order_));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index f6fdd52..8de3c01 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -39,6 +39,7 @@ class SortPlanNode : public PlanNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 };
 
 /// Node that implements a full sort of its input with a fixed memory budget, spilling
@@ -82,6 +83,7 @@ class SortNode : public ExecNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 
   /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch.
   /// Used to avoid unnecessary calls to ReleaseUnusedReservation().
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index b67e03a..7f21316 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -88,7 +88,7 @@ Status TopNNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_,
       expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_));
   tuple_row_less_than_.reset(
-      new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+      new TupleRowLexicalComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index d8dc832..23720ce 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -346,7 +346,7 @@ class DataStreamTest : public testing::Test {
     SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
     ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, runtime_state_.get()));
     ordering_exprs_.push_back(lhs_slot);
-    less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_,
+    less_than_ = obj_pool_.Add(new TupleRowLexicalComparator(ordering_exprs_,
         is_asc_, nulls_first_));
     ASSERT_OK(less_than_->Open(
         &obj_pool_, runtime_state_.get(), mem_pool_.get(), mem_pool_.get()));
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e2f7a76..947d49d 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -763,12 +763,13 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
     MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
     int64_t page_len, RuntimeProfile* profile, RuntimeState* state,
-    const string& node_label, bool enable_spilling)
+    const string& node_label, bool enable_spilling,
+    TSortingOrder::type sorting_order)
   : node_label_(node_label),
     state_(state),
     expr_perm_pool_(mem_tracker),
     expr_results_pool_(mem_tracker),
-    compare_less_than_(ordering_exprs, is_asc_order, nulls_first),
+    compare_less_than_(nullptr),
     in_mem_tuple_sorter_(nullptr),
     buffer_pool_client_(buffer_pool_client),
     page_len_(page_len),
@@ -784,7 +785,19 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
     num_merges_counter_(nullptr),
     in_mem_sort_timer_(nullptr),
     sorted_data_size_(nullptr),
-    run_sizes_(nullptr) {}
+    run_sizes_(nullptr) {
+  switch (sorting_order) {
+    case TSortingOrder::LEXICAL:
+      compare_less_than_.reset(new TupleRowLexicalComparator(ordering_exprs, is_asc_order,
+          nulls_first));
+      break;
+    case TSortingOrder::ZORDER:
+      compare_less_than_.reset(new TupleRowZOrderComparator(ordering_exprs));
+      break;
+    default:
+      DCHECK(false);
+  }
+}
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
@@ -812,7 +825,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
   in_mem_tuple_sorter_.reset(
-      new TupleSorter(this, compare_less_than_, sort_tuple_desc->byte_size(), state_));
+      new TupleSorter(this, *compare_less_than_, sort_tuple_desc->byte_size(), state_));
 
   if (enable_spilling_) {
     initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT);
@@ -831,13 +844,13 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
 }
 
 Status Sorter::Codegen(RuntimeState* state) {
-  return compare_less_than_.Codegen(state);
+  return compare_less_than_->Codegen(state);
 }
 
 Status Sorter::Open() {
   DCHECK(in_mem_tuple_sorter_ != nullptr) << "Not prepared";
   DCHECK(unsorted_run_ == nullptr) << "Already open";
-  RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_,
+  RETURN_IF_ERROR(compare_less_than_->Open(&obj_pool_, state_, &expr_perm_pool_,
       &expr_results_pool_));
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
   unsorted_run_ = run_pool_.Add(new Run(this, sort_tuple_desc, true));
@@ -927,12 +940,12 @@ void Sorter::Reset() {
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
-  compare_less_than_.Close(state_);
+  compare_less_than_->Close(state_);
 }
 
 void Sorter::Close(RuntimeState* state) {
   CleanupAllRuns();
-  compare_less_than_.Close(state);
+  compare_less_than_->Close(state);
   ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
   expr_perm_pool_.FreeAll();
   expr_results_pool_.FreeAll();
@@ -1069,7 +1082,7 @@ Status Sorter::CreateMerger(int num_runs) {
   // from the runs being merged. This is unnecessary overhead that is not required if we
   // correctly transfer resources.
   merger_.reset(
-      new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true));
+      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true));
 
   vector<function<Status (RowBatch**)>> merge_runs;
   merge_runs.reserve(num_runs);
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 2040b46..cb06fab 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -108,7 +108,7 @@ class Sorter {
       const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
       MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len,
       RuntimeProfile* profile, RuntimeState* state, const std::string& node_label,
-      bool enable_spilling);
+      bool enable_spilling, TSortingOrder::type sorting_order);
   ~Sorter();
 
   /// Initial set-up of the sorter for execution.
@@ -221,7 +221,7 @@ class Sorter {
   MemPool expr_results_pool_;
 
   /// In memory sorter and less-than comparator.
-  TupleRowComparator compare_less_than_;
+  boost::scoped_ptr<TupleRowComparator> compare_less_than_;
   boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;
 
   /// Client used to allocate pages from the buffer pool. Not owned.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 21127cb..c8cd735 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -143,6 +143,7 @@ add_library(UtilTests STATIC
   system-state-info-test.cc
   thread-pool-test.cc
   time-test.cc
+  tuple-row-compare-test.cc
   uid-util-test.cc
   zip-util-test.cc
 )
@@ -207,6 +208,7 @@ ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
 # Using standalone webserver-test for now, nonstandard main() passes in a port.
 ADD_BE_LSAN_TEST(webserver-test)
diff --git a/be/src/util/tuple-row-compare-test.cc b/be/src/util/tuple-row-compare-test.cc
new file mode 100644
index 0000000..2c1b196
--- /dev/null
+++ b/be/src/util/tuple-row-compare-test.cc
@@ -0,0 +1,803 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exprs/slot-ref.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "runtime/timestamp-value.inline.h"
+#include "runtime/tuple-row.h"
+#include "runtime/types.h"
+#include "testutil/gtest-util.h"
+#include "util/tuple-row-compare.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class TupleRowCompareTest : public testing::Test {
+ public:
+  TupleRowCompareTest() : expr_perm_pool_(&tracker_), expr_results_pool_(&tracker_) {}
+
+ protected:
+  typedef __uint128_t uint128_t;
+  /// Temporary runtime environment for the TupleRowComperator.
+  scoped_ptr<TestEnv> test_env_;
+  RuntimeState* runtime_state_;
+  RowDescriptor desc_;
+
+  ObjectPool pool_;
+  /// A dummy MemTracker used for exprs and other things we don't need to have limits on.
+  MemTracker tracker_;
+  MemPool expr_perm_pool_;
+  MemPool expr_results_pool_;
+  scoped_ptr<TupleRowZOrderComparator> comperator_;
+
+  vector<ScalarExpr*> ordering_exprs_;
+
+  virtual void SetUp() {
+    test_env_.reset(new TestEnv());
+    ASSERT_OK(test_env_->Init());
+  }
+
+  virtual void TearDown() {
+    comperator_->Close(runtime_state_);
+    ScalarExpr::Close(ordering_exprs_);
+
+    runtime_state_ = nullptr;
+    test_env_.reset();
+    expr_perm_pool_.FreeAll();
+    expr_results_pool_.FreeAll();
+    pool_.Clear();
+  }
+
+  void LoadComperator(int offset) {
+    comperator_.reset(new TupleRowZOrderComparator(ordering_exprs_));
+    ASSERT_OK(comperator_->Open(&pool_, runtime_state_, &expr_perm_pool_,
+        &expr_results_pool_));
+  }
+
+  // Only ColumnType should be passed as template parameter.
+  template <typename... Types>
+  void LoadComperator(int offset, ColumnType type, Types... types) {
+    //We are trying to fit into one slot, so the offset has to be < sizeof(int)*8.
+    DCHECK_LT(offset, sizeof(int) * 8) << "Too many columns added.";
+    SlotRef* build_expr = pool_.Add(new SlotRef(type, offset, true /* nullable */));
+    ASSERT_OK(build_expr->Init(desc_, true, nullptr));
+    ordering_exprs_.push_back(build_expr);
+    LoadComperator(offset + type.GetSlotSize(), types...);
+  }
+
+  template <typename... Types>
+  void CreateComperator(Types... types) {
+    ordering_exprs_.clear();
+    LoadComperator(1, types...);
+  }
+
+  template <bool IS_FIRST_SLOT_NULL = false, typename... Args>
+  TupleRow* CreateTupleRow(Args... args) {
+    // Only one null byte is allocated, so the joint slot size has to be < sizeof(int)*8.
+    DCHECK_LE(sizeof...(args), 8);
+    uint8_t* tuple_row_mem = expr_perm_pool_.Allocate(
+        sizeof(char*) + sizeof(int32_t*) * sizeof...(args));
+    Tuple* tuple_mem = Tuple::Create(sizeof(char) + GetSize(args...), &expr_perm_pool_);
+    if (IS_FIRST_SLOT_NULL) tuple_mem->SetNull(NullIndicatorOffset(0, 1));
+    FillMem(tuple_mem, 1, args...);
+    TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    row->SetTuple(0, tuple_mem);
+    return row;
+  }
+
+  unsigned GetSize() { return 0; }
+
+  template <typename T, typename... Args>
+  unsigned GetSize(const T & head, const Args &... tail) {
+      return sizeof(T) + GetSize(tail...);
+  }
+
+  template <typename T>
+  void FillMem(Tuple* tuple_mem, int idx, T val) {
+    *reinterpret_cast<T*>(tuple_mem->GetSlot(idx)) = val;
+  }
+
+  template <typename T, typename... Args>
+  void FillMem(Tuple* tuple_mem, int idx, T val, Args... args) {
+    *reinterpret_cast<T*>(tuple_mem->GetSlot(idx)) = val;
+    FillMem(tuple_mem, idx + sizeof(T), args...);
+  }
+
+  template <typename T, bool IS_FIRST_SLOT_NULL = false>
+  int CompareTest(T lval1, T lval2, T rval1, T rval2) {
+    TupleRow* lhs = CreateTupleRow<IS_FIRST_SLOT_NULL>(lval1, lval2);
+    TupleRow* rhs = CreateTupleRow(rval1, rval2);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  // With this function, nullable entries can also be tested, by setting the
+  // template parameter to true. This means that the first slot of the
+  // left hand side is set to null, which should be equal to the
+  // smallest possible value when comparing.
+  template <bool IS_FIRST_SLOT_NULL = false>
+  int IntIntTest(int32_t lval1, int32_t lval2, int32_t rval1, int32_t rval2) {
+    CreateComperator(ColumnType(TYPE_INT), ColumnType(TYPE_INT));
+    return CompareTest<int32_t, IS_FIRST_SLOT_NULL>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int64Int64Test(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2) {
+    CreateComperator(ColumnType(TYPE_BIGINT), ColumnType(TYPE_BIGINT));
+    return CompareTest<int64_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int16Int16Test(int16_t lval1, int16_t lval2, int16_t rval1, int16_t rval2) {
+    CreateComperator(ColumnType(TYPE_SMALLINT), ColumnType(TYPE_SMALLINT));
+    return CompareTest<int16_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int8Int8Test(int8_t lval1, int8_t lval2, int8_t rval1, int8_t rval2) {
+    CreateComperator(ColumnType(TYPE_TINYINT), ColumnType(TYPE_TINYINT));
+    return CompareTest<int8_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int FloatFloatTest(float lval1, float lval2, float rval1, float rval2) {
+    CreateComperator(ColumnType(TYPE_FLOAT), ColumnType(TYPE_FLOAT));
+    return CompareTest<float>(lval1, lval2, rval1, rval2);
+  }
+
+  int DoubleDoubleTest(double lval1, double lval2, double rval1, double rval2) {
+    CreateComperator(ColumnType(TYPE_DOUBLE), ColumnType(TYPE_DOUBLE));
+    return CompareTest<double>(lval1, lval2, rval1, rval2);
+  }
+
+  int BoolBoolTest(bool lval1, bool lval2, bool rval1, bool rval2) {
+    CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_BOOLEAN));
+    return CompareTest<bool>(lval1, lval2, rval1, rval2);
+  }
+
+  // Char is a special case, so its tuples have to be created differently.
+  // This function is responsible for only the char test below, therefore
+  // the tuple will have a fix size of two slots.
+  TupleRow* CreateCharArrayTupleRow(const char* ptr1, const char* ptr2) {
+    uint8_t* tuple_row_mem = expr_perm_pool_.Allocate(
+        sizeof(char*) + sizeof(int32_t*) * 2);
+    Tuple* tuple_mem =
+        Tuple::Create(sizeof(char) + strlen(ptr1) + strlen(ptr2), &expr_perm_pool_);
+    memcpy(tuple_mem->GetSlot(1), ptr1, strlen(ptr1));
+    memcpy(tuple_mem->GetSlot(1 + strlen(ptr1)), ptr2, strlen(ptr2));
+    TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    row->SetTuple(0, tuple_mem);
+    return row;
+  }
+
+  int CharCharTest(const char *lval1, const char *lval2, const char *rval1,
+      const char *rval2) {
+    DCHECK_EQ(strlen(lval1), strlen(rval1));
+    DCHECK_EQ(strlen(lval2), strlen(rval2));
+    CreateComperator(ColumnType::CreateCharType(strlen(lval1)),
+        ColumnType::CreateCharType(strlen(lval2)));
+
+    TupleRow* lhs = CreateCharArrayTupleRow(lval1, lval2);
+    TupleRow* rhs = CreateCharArrayTupleRow(rval1, rval2);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  int DateDateTest(DateValue lval1, DateValue lval2, DateValue rval1, DateValue rval2) {
+    CreateComperator(ColumnType(TYPE_DATE), ColumnType(TYPE_DATE));
+    return CompareTest<DateValue>(lval1, lval2, rval1, rval2);
+  }
+
+  int TimestampTimestampTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    return TimestampTimestampTest(
+        TimestampValue::ParseSimpleDateFormat(lval1),
+        TimestampValue::ParseSimpleDateFormat(lval2),
+        TimestampValue::ParseSimpleDateFormat(rval1),
+        TimestampValue::ParseSimpleDateFormat(rval2));
+  }
+
+  int TimestampTimestampTest(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2) {
+    return TimestampTimestampTest(TimestampValue::FromDaysSinceUnixEpoch(lval1),
+        TimestampValue::FromDaysSinceUnixEpoch(lval2),
+        TimestampValue::FromDaysSinceUnixEpoch(rval1),
+        TimestampValue::FromDaysSinceUnixEpoch(rval2));
+  }
+
+  int TimestampTimestampTest(TimestampValue lval1, TimestampValue lval2,
+      TimestampValue rval1, TimestampValue rval2) {
+    CreateComperator(ColumnType(TYPE_TIMESTAMP), ColumnType(TYPE_TIMESTAMP));
+    return CompareTest<TimestampValue>(lval1, lval2, rval1, rval2);
+  }
+
+  template<typename DECIMAL_T>
+  int DecimalDecimalTest(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2,
+      int precision, int scale) {
+    ColumnType decimal_column = ColumnType::CreateDecimalType(precision, scale);
+    CreateComperator(decimal_column, decimal_column);
+    bool overflow = false;
+    TupleRow* lhs = CreateTupleRow(DECIMAL_T::FromInt(precision, scale, lval1, &overflow),
+        DECIMAL_T::FromInt(precision, scale, lval2, &overflow));
+    TupleRow* rhs = CreateTupleRow(DECIMAL_T::FromInt(precision, scale, rval1, &overflow),
+        DECIMAL_T::FromInt(precision, scale, rval2, &overflow));
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  // The number of bytes to compare in a string depends on the size of the types present
+  // in a row. If there are only strings present, only the first 4 bytes will be
+  // considered. This is decided in TupleRowZOrderComparator::CompareInterpreted, where
+  // the GetByteSize returns 0 for strings.
+  int StringString4ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    CreateComperator(ColumnType(TYPE_STRING), ColumnType(TYPE_STRING));
+    return CompareTest<StringValue>(StringValue(lval1), StringValue(lval2),
+        StringValue(rval1), StringValue(rval2));
+  }
+
+  // Requires that the comparator has already been created with the two String/Varchar
+  // columns and a column corresponding to T.
+  template<typename T>
+  int GenericStringTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2, T dummyValue)  {
+    TupleRow* lhs = CreateTupleRow(StringValue(lval1), StringValue(lval2), dummyValue);
+    TupleRow* rhs = CreateTupleRow(StringValue(rval1), StringValue(rval2), dummyValue);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  int StringString8ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // TYPE_BIGINT will enforce checking the first 8 bytes of the strings.
+    CreateComperator(
+        ColumnType(TYPE_STRING), ColumnType(TYPE_STRING), ColumnType(TYPE_BIGINT));
+    int64_t dummyValue = 0;
+    return GenericStringTest<int64_t>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int StringString16ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // The Decimal16Value column will enforce checking the first 16 bytes of the strings.
+    int precision = ColumnType::MAX_PRECISION;
+    int scale = 0;
+    bool overflow = false;
+    Decimal16Value dummyValue = Decimal16Value::FromInt(precision, scale, 0, &overflow);
+    CreateComperator(ColumnType(TYPE_STRING), ColumnType(TYPE_STRING),
+        ColumnType::CreateDecimalType(precision, scale));
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int VarcharVarchar4ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32));
+    return CompareTest<StringValue>(StringValue(lval1), StringValue(lval2),
+        StringValue(rval1), StringValue(rval2));
+  }
+
+  int VarcharVarchar8ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // TYPE_BIGINT will enforce checking the first 8 bytes of the strings.
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32), ColumnType(TYPE_BIGINT));
+    int64_t dummyValue = 0;
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int VarcharVarchar16ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // The Decimal16Value column will enforce checking the first 16 bytes of the strings.
+    int precision = ColumnType::MAX_PRECISION;
+    int scale = 0;
+    bool overflow = false;
+    Decimal16Value dummyValue = Decimal16Value::FromInt(precision, scale, 0, &overflow);
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32),
+        ColumnType::CreateDecimalType(precision, scale));
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+};
+
+// The Z-values used and their order are visualized in the following image:
+// https://en.wikipedia.org/wiki/File:Z-curve.svg
+TEST_F(TupleRowCompareTest, Int32Test) {
+  EXPECT_EQ(IntIntTest(0, 0, 0, 0), 0);
+  EXPECT_EQ(IntIntTest(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(IntIntTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(IntIntTest(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(IntIntTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(IntIntTest(2, 4, 1, 7), 1);
+  EXPECT_EQ(IntIntTest(3, 7, 4, 0), -1);
+  EXPECT_EQ(IntIntTest(6, 4, 5, 7), 1);
+  EXPECT_EQ(IntIntTest(5, 5, 6, 4), -1);
+  EXPECT_EQ(IntIntTest(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(IntIntTest(INT32_MAX / 2 + 2, 1, 1, INT32_MAX), 1);
+  EXPECT_EQ(IntIntTest(INT32_MAX / 2, 1, 1, INT32_MAX), -1);
+
+  // Some null tests (see details at IntIntTest)
+  EXPECT_EQ(IntIntTest<true>(1, 1, 1, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(4242, 1, 1, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(1, 0, 0, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(1, 0, INT32_MIN, 0), 0);
+}
+
+TEST_F(TupleRowCompareTest, Int64Test) {
+  EXPECT_EQ(Int64Int64Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int64Int64Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int64Int64Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int64Int64Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int64Int64Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int64Int64Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int64Int64Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int64Int64Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int64Int64Test(INT64_MAX / 2 + 2, 1, 1, INT64_MAX), 1);
+  EXPECT_EQ(Int64Int64Test(INT64_MAX / 2, 1, 1, INT64_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, Int16Test) {
+  EXPECT_EQ(Int16Int16Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int16Int16Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int16Int16Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int16Int16Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int16Int16Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int16Int16Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int16Int16Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int16Int16Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int16Int16Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int16Int16Test(INT16_MAX / 2 + 2, 1, 1, INT16_MAX), 1);
+  EXPECT_EQ(Int16Int16Test(INT16_MAX / 2, 1, 1, INT16_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, Int8Test) {
+  EXPECT_EQ(Int8Int8Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int8Int8Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int8Int8Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int8Int8Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int8Int8Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int8Int8Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2 + 2, 1, 1, INT8_MAX), 1);
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2, 1, 1, INT8_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, FloatTest) {
+  EXPECT_EQ(FloatFloatTest(1.0f, 0.0f, 0.0f, 1.0f), 1);
+  EXPECT_EQ(FloatFloatTest(0.0f, 1.0f, 1.0f, 0.0f), -1);
+
+  EXPECT_EQ(FloatFloatTest(4.0f, 3.0f, 3.0f, 4.0f), 1);
+  EXPECT_EQ(FloatFloatTest(5.0f, 7.0f, 4.0f, 10.0f), -1);
+  EXPECT_EQ(FloatFloatTest(6.0f, 10.0f, 7.0f, 3.0f), 1);
+  EXPECT_EQ(FloatFloatTest(9.0f, 7.0f, 8.0f, 10.0f), -1);
+  EXPECT_EQ(FloatFloatTest(8.0f , 8.0f, 9.0f, 7.0f), 1);
+  EXPECT_EQ(FloatFloatTest(9.0f, 4.0f, 6.0f, 10.0f), 1);
+
+  EXPECT_EQ(FloatFloatTest(-4.0f, -3.0f, -3.0f, -4.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-5.0f, -7.0f, -4.0f, -10.0f), 1);
+  EXPECT_EQ(FloatFloatTest(-6.0f, -10.0f, -7.0f, -3.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-9.0f, -7.0f, -8.0f, -10.0f), 1);
+  EXPECT_EQ(FloatFloatTest(-8.0f, -8.0f, -9.0f, -7.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-9.0f, -4.0f, -6.0f, -10.0f), -1);
+
+  EXPECT_EQ(FloatFloatTest(FLT_MAX / 2.0f + 2.0f, 1.0f, 1.0f, FLT_MAX), 1);
+}
+
+TEST_F(TupleRowCompareTest, DoubleTest) {
+  EXPECT_EQ(DoubleDoubleTest(1.0, 0.0, 0.0, 1.0f), 1);
+  EXPECT_EQ(DoubleDoubleTest(0.0, 1.0, 1.0, 0.0f), -1);
+
+  EXPECT_EQ(DoubleDoubleTest(4.0, 3.0, 3.0, 4.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(5.0, 7.0, 4.0, 10.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(6.0, 10.0, 7.0, 3.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(9.0, 7.0, 8.0, 10.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(8.0, 8.0, 9.0, 7.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(9.0, 4.0, 6.0, 10.0), 1);
+
+  EXPECT_EQ(DoubleDoubleTest(-4.0, -3.0, -3.0, -4.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-5.0, -7.0, -4.0, -10.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(-6.0, -10.0, -7.0, -3.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-9.0, -7.0, -8.0, -10.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(-8.0, -8.0, -9.0, -7.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-9.0, -4.0, -6.0, -10.0), -1);
+
+  EXPECT_EQ(DoubleDoubleTest(DBL_MAX / 2.0 + 2.0, 1.0, 1.0, DBL_MAX), 1);
+}
+
+TEST_F(TupleRowCompareTest, BoolTest) {
+  EXPECT_EQ(BoolBoolTest(true, false, true, false), 0);
+  EXPECT_EQ(BoolBoolTest(false, true, false, true), 0);
+
+  EXPECT_EQ(BoolBoolTest(true, true, true, false), 1);
+  EXPECT_EQ(BoolBoolTest(false, true, true, true), -1);
+  EXPECT_EQ(BoolBoolTest(false, true, false, false), 1);
+  EXPECT_EQ(BoolBoolTest(false, false, false, true), -1);
+  EXPECT_EQ(BoolBoolTest(true, false, false, false), 1);
+}
+
+TEST_F(TupleRowCompareTest, CharTest) {
+  EXPECT_EQ(CharCharTest("a", "b", "a", "b"), 0);
+  EXPECT_EQ(CharCharTest("a", "b", "a", "b"), 0);
+  EXPECT_EQ(CharCharTest("h", "0", "h", "0"), 0);
+
+  EXPECT_EQ(CharCharTest("h", "z", "z", "h"), -1);
+  EXPECT_EQ(CharCharTest("a", "0", "h", "0"), -1);
+  EXPECT_EQ(CharCharTest("!", "{", "0", "K"), 1);
+  EXPECT_EQ(CharCharTest("A", "~", "B", "Z"), 1);
+
+  EXPECT_EQ(CharCharTest("aaa", "bbb", "aaa", "bbb"), 0);
+  EXPECT_EQ(CharCharTest("abc", "bbc", "abc", "bbc"), 0);
+  EXPECT_EQ(CharCharTest("aah", "aa0", "aah", "aa0"), 0);
+
+  EXPECT_EQ(CharCharTest("aaa", "aa0", "aah", "aa0"), -1);
+  EXPECT_EQ(CharCharTest("aah", "aaz", "aaz", "aah"), -1);
+  EXPECT_EQ(CharCharTest("aa!", "aa{", "aa0", "aaK"), 1);
+  EXPECT_EQ(CharCharTest("aaA", "aa~", "aaB", "aaZ"), 1);
+}
+
+TEST_F(TupleRowCompareTest, DateTest) {
+  EXPECT_EQ(Int8Int8Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int8Int8Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int8Int8Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int8Int8Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int8Int8Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int8Int8Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2 + 2, 1, 1, INT8_MAX), 1);
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2, 1, 1, INT8_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, TimestampTest) {
+  EXPECT_EQ(TimestampTimestampTest(
+      "2015-04-09 14:07:46.580465000", "2015-04-09 14:07:46.580465000",
+      "2015-04-09 14:07:46.580465000", "2015-04-09 14:07:46.580465000"), 0);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1415-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000",
+      "1415-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000"), 0);
+
+  EXPECT_EQ(TimestampTimestampTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(TimestampTimestampTest(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(TimestampTimestampTest(2, 4, 1, 7), 1);
+  EXPECT_EQ(TimestampTimestampTest(3, 7, 4, 0), -1);
+  EXPECT_EQ(TimestampTimestampTest(6, 4, 5, 7), 1);
+  EXPECT_EQ(TimestampTimestampTest(5, 5, 6, 4), -1);
+  EXPECT_EQ(TimestampTimestampTest(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000000", "9999-12-31 14:07:46.580465000",
+      "8000-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000"), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000001", "1400-01-01 00:00:00.000000000",
+      "1400-01-01 00:00:00.000000000", "1400-01-01 00:00:00.000000001"), 1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000003", "1400-01-01 00:00:00.000000007",
+      "1400-01-01 00:00:00.000000004", "1400-01-01 00:00:00.000000000"), -1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000006", "1400-01-01 00:00:00.000000004",
+      "1400-01-01 00:00:00.000000005", "1400-01-01 00:00:00.000000007"), 1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000005", "1400-01-01 00:00:00.000000005",
+      "1400-01-01 00:00:00.000000006", "1400-01-01 00:00:00.000000004"), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 23:59:59.999999999", "1400-01-01 00:00:00.000000000",
+      "1400-01-02 00:00:00.000000000", "1400-01-01 00:00:00.000000000"), -1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "3541-11-03 23:59:59.999999999", "3541-11-03 00:00:00.000000000",
+      "3541-11-04 00:00:00.000000000", "3541-11-03 00:00:00.000000000"), -1);
+}
+
+
+TEST_F(TupleRowCompareTest, DecimalTest) {
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(256, 10, 255, 100, 4, 2), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(3, 1024, 128, 1023, 9, 1), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1024, 511, 1023, 0, 5, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(5550, 0, 5000, 4097, 9, 3), -1);
+
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(256, 10, 255, 100, 18, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(3, 1024, 128, 1023, 18, 1), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1024, 511, 1023, 0, 18, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(5550, 0, 5000, 4097, 18, 3), -1);
+
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(256, 10, 255, 100,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(3, 1024, 128, 1023,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1024, 511, 1023, 0,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(5550, 0, 5000, 4097,
+      ColumnType::MAX_PRECISION, 0), -1);
+}
+
+TEST_F(TupleRowCompareTest, AllTypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_TIMESTAMP), ColumnType(TYPE_BOOLEAN),
+      ColumnType(TYPE_TINYINT), ColumnType(TYPE_BIGINT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000002"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000001"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000002"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000001"),
+      false, static_cast<int8_t>(42), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  // Checking the dominance of types (bigger types should not dominate smaller ones)
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      true, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 23:59:59.999999999"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(1) << 5, '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 23:59:59.999999999"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(1) << 5, '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+// Without converting into 128-bit representation, only 64-bit comparison
+TEST_F(TupleRowCompareTest, All64TypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_TINYINT),
+      ColumnType(TYPE_SMALLINT), ColumnType(TYPE_INT),
+      ColumnType(TYPE_BIGINT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), 'z',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), 'a',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), '~',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(56), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), '~',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+// Without converting into 64-bit representation, only 32-bit comparison
+TEST_F(TupleRowCompareTest, All32TypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_TINYINT),
+      ColumnType(TYPE_SMALLINT), ColumnType(TYPE_INT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      '~', Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      '~', Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 312, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(56),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+TEST_F(TupleRowCompareTest, StringTest) {
+  EXPECT_EQ(StringString4ByteTest("hello", "hello", "hello", "hello"), 0);
+  EXPECT_EQ(StringString4ByteTest(std::string(255, 'a'), std::string(255, 'a'),
+      std::string(255, 'a'), std::string(255, 'a')), 0);
+  EXPECT_EQ(StringString4ByteTest(std::string(2550, 'a'), std::string(2550, 'a'),
+      std::string(2550, 'a'), std::string(2550, 'a')), 0);
+
+  EXPECT_EQ(StringString4ByteTest("hello1", "hello2", "hello3", "hello4"), 0);
+  EXPECT_EQ(StringString4ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString4ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString4ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString4ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString4ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(StringString8ByteTest("hello1", "hello2", "hello3", "hello4"), -1);
+  EXPECT_EQ(StringString8ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), 0);
+  EXPECT_EQ(StringString8ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(StringString8ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString8ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString8ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString8ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString8ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(StringString16ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), -1);
+  EXPECT_EQ(StringString16ByteTest("1234567812345678a", "1234567812345678b",
+      "1234567812345678c", "1234567812345678d"), 0);
+  EXPECT_EQ(StringString16ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(StringString16ByteTest("aa", "1234567812345678",
+      "aa", "1234567812345679"), -1);
+  EXPECT_EQ(StringString16ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString16ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString16ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString16ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString16ByteTest("zz", "ydz", "a", "caaa"), 1);
+}
+
+TEST_F(TupleRowCompareTest, VarcharTest) {
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello", "hello", "hello", "hello"), 0);
+
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello", "hello", "hello", "hello"), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest(std::string(255, 'a'), std::string(255, 'a'),
+      std::string(255, 'a'), std::string(255, 'a')), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest(std::string(2550, 'a'), std::string(2550, 'a'),
+      std::string(2550, 'a'), std::string(2550, 'a')), 0);
+
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello1", "hello2", "hello3", "hello4"), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(VarcharVarchar8ByteTest("hello1", "hello2", "hello3", "hello4"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), 0);
+  EXPECT_EQ(VarcharVarchar8ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(VarcharVarchar16ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("1234567812345678a", "1234567812345678b",
+      "1234567812345678c", "1234567812345678d"), 0);
+  EXPECT_EQ(VarcharVarchar16ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("aa", "1234567812345678",
+      "aa", "1234567812345679"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("zz", "ydz", "a", "caaa"), 1);
+}
+
+} //namespace impala
+
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index f424eb4..494b507 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -24,7 +24,9 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/runtime-state.h"
+#include "runtime/multi-precision.h"
 #include "util/runtime-profile-counters.h"
+#include "util/bit-util.h"
 
 using namespace impala;
 using namespace strings;
@@ -50,7 +52,17 @@ void TupleRowComparator::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(ordering_expr_evals_lhs_, state);
 }
 
-int TupleRowComparator::CompareInterpreted(
+Status TupleRowComparator::Codegen(RuntimeState* state) {
+  llvm::Function* fn;
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+  RETURN_IF_ERROR(CodegenCompare(codegen, &fn));
+  codegend_compare_fn_ = state->obj_pool()->Add(new CompareFn);
+  codegen->AddFunctionToJit(fn, reinterpret_cast<void**>(codegend_compare_fn_));
+  return Status::OK();
+}
+
+int TupleRowLexicalComparator::CompareInterpreted(
     const TupleRow* lhs, const TupleRow* rhs) const {
   DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size());
   DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size());
@@ -71,16 +83,6 @@ int TupleRowComparator::CompareInterpreted(
   return 0; // fully equivalent key
 }
 
-Status TupleRowComparator::Codegen(RuntimeState* state) {
-  llvm::Function* fn;
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != NULL);
-  RETURN_IF_ERROR(CodegenCompare(codegen, &fn));
-  codegend_compare_fn_ = state->obj_pool()->Add(new CompareFn);
-  codegen->AddFunctionToJit(fn, reinterpret_cast<void**>(codegend_compare_fn_));
-  return Status::OK();
-}
-
 // Codegens an unrolled version of Compare(). Uses codegen'd key exprs and injects
 // nulls_first_ and is_asc_ values.
 //
@@ -202,7 +204,8 @@ Status TupleRowComparator::Codegen(RuntimeState* state) {
 // next_key2:                                        ; preds = %rhs_non_null12, %next_key
 //   ret i32 0
 // }
-Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) {
+Status TupleRowLexicalComparator::CodegenCompare(LlvmCodeGen* codegen,
+    llvm::Function** fn) {
   llvm::LLVMContext& context = codegen->context();
   const vector<ScalarExpr*>& ordering_exprs = ordering_exprs_;
   llvm::Function* key_fns[ordering_exprs.size()];
@@ -307,3 +310,154 @@ Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, llvm::Function**
   }
   return Status::OK();
 }
+
+int TupleRowZOrderComparator::CompareInterpreted(const TupleRow* lhs,
+    const TupleRow* rhs) const {
+  DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size());
+  DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size());
+
+  // The algorithm requires all values having a common type, without loss of data.
+  // This means we have to find the biggest type.
+  int max_size = ordering_exprs_[0]->type().GetByteSize();
+  for (int i = 1; i < ordering_exprs_.size(); ++i) {
+    if (ordering_exprs_[i]->type().GetByteSize() > max_size) {
+      max_size = ordering_exprs_[i]->type().GetByteSize();
+    }
+  }
+  if (max_size <= 4) {
+    return CompareBasedOnSize<uint32_t>(lhs, rhs);
+  } else if (max_size <= 8) {
+    return CompareBasedOnSize<uint64_t>(lhs, rhs);
+  } else {
+    return CompareBasedOnSize<uint128_t>(lhs, rhs);
+  }
+}
+
+template<typename U>
+int TupleRowZOrderComparator::CompareBasedOnSize(const TupleRow* lhs,
+    const TupleRow* rhs) const {
+  auto less_msb = [](U x, U y) { return x < y && x < (x ^ y); };
+  ColumnType type = ordering_exprs_[0]->type();
+  // Values of the most significant dimension from both sides.
+  U msd_lhs = GetSharedRepresentation<U>(ordering_expr_evals_lhs_[0]->GetValue(lhs),
+      type);
+  U msd_rhs = GetSharedRepresentation<U>(ordering_expr_evals_rhs_[0]->GetValue(rhs),
+      type);
+  for (int i = 1; i < ordering_exprs_.size(); ++i) {
+    type = ordering_exprs_[i]->type();
+    void* lhs_v = ordering_expr_evals_lhs_[i]->GetValue(lhs);
+    void* rhs_v = ordering_expr_evals_rhs_[i]->GetValue(rhs);
+
+    U lhsi = GetSharedRepresentation<U>(lhs_v, type);
+    U rhsi = GetSharedRepresentation<U>(rhs_v, type);
+
+    if (less_msb(msd_lhs ^ msd_rhs, lhsi ^ rhsi)) {
+      msd_lhs = lhsi;
+      msd_rhs = rhsi;
+    }
+  }
+  return msd_lhs < msd_rhs ? -1 : (msd_lhs > msd_rhs ? 1 : 0);
+}
+
+template <typename U>
+U TupleRowZOrderComparator::GetSharedRepresentation(void* val, ColumnType type) const {
+  // The mask used for setting the sign bit correctly.
+  if (val == NULL) return 0;
+  constexpr U mask = (U)1 << (sizeof(U) * 8 - 1);
+  switch (type.type) {
+    case TYPE_NULL:
+      return 0;
+    case TYPE_BOOLEAN:
+      return static_cast<U>(*reinterpret_cast<const bool*>(val)) << (sizeof(U) * 8 - 1);
+    case TYPE_TINYINT:
+      return GetSharedIntRepresentation<U, int8_t>(
+          *reinterpret_cast<const int8_t*>(val), mask);
+    case TYPE_SMALLINT:
+      return GetSharedIntRepresentation<U, int16_t>(
+          *reinterpret_cast<const int16_t*>(val), mask);
+    case TYPE_INT:
+      return GetSharedIntRepresentation<U, int32_t>(
+          *reinterpret_cast<const int32_t*>(val), mask);
+    case TYPE_BIGINT:
+      return GetSharedIntRepresentation<U, int64_t>(
+          *reinterpret_cast<const int64_t*>(val), mask);
+    case TYPE_DATE:
+      return GetSharedIntRepresentation<U, int32_t>(
+          reinterpret_cast<const DateValue*>(val)->Value(), mask);
+    case TYPE_FLOAT:
+      return GetSharedFloatRepresentation<U, float>(val, mask);
+    case TYPE_DOUBLE:
+      return GetSharedFloatRepresentation<U, double>(val, mask);
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+      const StringValue* string_value = reinterpret_cast<const StringValue*>(val);
+      return GetSharedStringRepresentation<U>(string_value->ptr, string_value->len);
+    }
+    case TYPE_CHAR:
+      return GetSharedStringRepresentation<U>(
+          reinterpret_cast<const char*>(val), type.len);
+    case TYPE_TIMESTAMP: {
+      const TimestampValue* ts = reinterpret_cast<const TimestampValue*>(val);
+      const uint128_t nanosnds = static_cast<uint128_t>(ts->time().total_nanoseconds());
+      const uint128_t days = static_cast<uint128_t>(ts->date().day_number());
+      return (days << 64) | nanosnds;
+    }
+    case TYPE_DECIMAL:
+      switch (type.GetByteSize()) {
+        case 4:
+          return GetSharedIntRepresentation<U, int32_t>(
+              reinterpret_cast<const Decimal4Value*>(val)->value(), mask);
+        case 8:
+          return GetSharedIntRepresentation<U, int64_t>(
+              reinterpret_cast<const Decimal8Value*>(val)->value(), mask);
+        case 16: // value is of int128_t, big enough that no shifts are needed
+          return static_cast<U>(
+              reinterpret_cast<const Decimal16Value*>(val)->value()) ^ mask;
+        default:
+          DCHECK(false) << type;
+          return 0;
+      }
+    default:
+      return 0;
+  }
+}
+
+template <typename U, typename T>
+U inline TupleRowZOrderComparator::GetSharedIntRepresentation(const T val, U mask) const {
+  return (static_cast<U>(val) <<
+      std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0)) ^ mask;
+}
+
+template <typename U, typename T>
+U inline TupleRowZOrderComparator::GetSharedFloatRepresentation(void* val, U mask) const {
+  int64_t tmp;
+  T floating_value = *reinterpret_cast<const T*>(val);
+  memcpy(&tmp, &floating_value, sizeof(T));
+  if (UNLIKELY(std::isnan(floating_value))) return 0;
+  if (floating_value < 0.0) {
+    // Flipping all bits for negative values.
+    return static_cast<U>(~tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0);
+  } else {
+    // Flipping only first bit.
+    return (static_cast<U>(tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0)) ^
+        mask;
+  }
+}
+
+template <typename U>
+U inline TupleRowZOrderComparator::GetSharedStringRepresentation(const char* char_ptr,
+    int length) const {
+  int len = length < sizeof(U) ? length : sizeof(U);
+  if (len == 0) return 0;
+  U dst = 0;
+  // We copy the bytes from the string but swap the bytes because of integer endianness.
+  BitUtil::ByteSwap(&dst, char_ptr, len);
+  return dst << ((sizeof(U) - len) * 8);
+}
+
+Status TupleRowZOrderComparator::CodegenCompare(LlvmCodeGen* codegen,
+    llvm::Function** fn) {
+  LOG(WARNING) << "TupleRowZOrderComparator::CodegenCompare is not yet implemented.";
+  return Status("TupleRowZOrderComparator has no Codegen'd comperator.");
+}
+
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index e1933d5..fdaa1ed 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -57,22 +57,14 @@ class ComparatorWrapper {
   }
 };
 
-/// Compares two TupleRows based on a set of exprs, in order.
+/// Interface for comparing two TupleRows based on a set of exprs.
 class TupleRowComparator {
  public:
   /// 'ordering_exprs': the ordering expressions for tuple comparison.
-  /// 'is_asc' determines, for each expr, if it should be ascending or descending sort
-  /// order.
-  /// 'nulls_first' determines, for each expr, if nulls should come before or after all
-  /// other values.
-  TupleRowComparator(const std::vector<ScalarExpr*>& ordering_exprs,
-      const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first)
+  TupleRowComparator(const std::vector<ScalarExpr*>& ordering_exprs)
     : ordering_exprs_(ordering_exprs),
-      is_asc_(is_asc),
-      codegend_compare_fn_(nullptr) {
-    DCHECK_EQ(is_asc_.size(), ordering_exprs.size());
-    for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1);
-  }
+      codegend_compare_fn_(nullptr) { }
+  virtual ~TupleRowComparator() {}
 
   /// Create the evaluators for the ordering expressions and store them in 'pool'. The
   /// evaluators use 'expr_perm_pool' and 'expr_results_pool' for permanent and result
@@ -113,15 +105,7 @@ class TupleRowComparator {
     return Less(lhs_row, rhs_row);
   }
 
- private:
-  /// Interpreted implementation of Compare().
-  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const;
-
-  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
-  /// TODO: inline this at codegen'd callsites instead of indirectly calling via function
-  /// pointer.
-  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn);
-
+ protected:
   /// References to ordering expressions owned by the Exec node which owns this
   /// TupleRowComparator.
   const std::vector<ScalarExpr*>& ordering_exprs_;
@@ -131,9 +115,6 @@ class TupleRowComparator {
   std::vector<ScalarExprEvaluator*> ordering_expr_evals_lhs_;
   std::vector<ScalarExprEvaluator*> ordering_expr_evals_rhs_;
 
-  const std::vector<bool>& is_asc_;
-  std::vector<int8_t> nulls_first_;
-
   /// We store a pointer to the codegen'd function pointer (adding an extra level of
   /// indirection) so that copies of this TupleRowComparator will have the same pointer to
   /// the codegen'd function. This is necessary because the codegen'd function pointer is
@@ -144,6 +125,85 @@ class TupleRowComparator {
   typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* const*,
       const TupleRow*, const TupleRow*);
   CompareFn* codegend_compare_fn_;
+ private:
+  /// Interpreted implementation of Compare().
+  virtual int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const = 0;
+
+  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
+  /// TODO: inline this at codegen'd callsites instead of indirectly calling via function
+  /// pointer.
+  virtual Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) = 0;
+};
+
+/// Compares two TupleRows based on a set of exprs, in lexicographical order.
+class TupleRowLexicalComparator : public TupleRowComparator {
+ public:
+  /// 'ordering_exprs': the ordering expressions for tuple comparison.
+  /// 'is_asc' determines, for each expr, if it should be ascending or descending sort
+  /// order.
+  /// 'nulls_first' determines, for each expr, if nulls should come before or after all
+  /// other values.
+  TupleRowLexicalComparator(const std::vector<ScalarExpr*>& ordering_exprs,
+      const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first)
+    : TupleRowComparator(ordering_exprs),
+      is_asc_(is_asc) {
+    DCHECK_EQ(is_asc_.size(), ordering_exprs.size());
+    for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1);
+  }
+
+ private:
+  const std::vector<bool>& is_asc_;
+  std::vector<int8_t> nulls_first_;
+
+  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const override;
+
+  /// TODO: implement it.
+  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) override;
+};
+
+/// Compares two TupleRows based on a set of exprs, in Z-order.
+class TupleRowZOrderComparator : public TupleRowComparator {
+ public:
+  /// 'ordering_exprs': the ordering expressions for tuple comparison.
+  TupleRowZOrderComparator(const std::vector<ScalarExpr*>& ordering_exprs)
+    : TupleRowComparator(ordering_exprs) { }
+
+ private:
+  typedef __uint128_t uint128_t;
+
+  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const override;
+  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) override;
+
+  /// Compares the rows using Z-ordering. The function does not calculate the actual
+  /// Z-value, only looks for the column with the most significant dimension, and compares
+  /// the values of that column. To make this possible, a unified type is necessary where
+  /// all values share the same bit-representation. The three common types which all the
+  /// others are converted to are uint32_t, uint64_t and uint128_t. Comparing smaller
+  /// types (ie. having less bits) with bigger ones would make the bigger type much more
+  /// dominant therefore the bits of these smaller types are shifted up.
+  template<typename U>
+  int CompareBasedOnSize(const TupleRow* lhs, const TupleRow* rhs) const;
+
+  /// We transform the original a and b values to their "shared representation", a' and b'
+  /// in a way that if a < b then a' is lexically less than b' regarding to their bits.
+  /// Thus, for ints INT_MIN would be 0, INT_MIN+1 would be 1, and so on, and in the end
+  /// INT_MAX would be 111..111.
+  /// The basic concept of getting the shared representation is as follows:
+  /// 1. Reinterpret void* as the actual type
+  /// 2. Convert the number to the chosen unsigned type (U)
+  /// 3. If U is bigger than the actual type, the bits of the small type are shifted up.
+  /// 4. Flip the sign bit because the value was converted to unsigned.
+  /// Note that floating points are represented differently, where for negative values all
+  /// bits have to get flipped.
+  /// Null values will be treated as the minimum value (unsigned 0).
+  template <typename U>
+  U GetSharedRepresentation(void* val, ColumnType type) const;
+  template <typename U>
+  U inline GetSharedStringRepresentation(const char* char_ptr, int length) const;
+  template <typename U, typename T>
+  U inline GetSharedIntRepresentation(const T val, U mask) const;
+  template <typename U, typename T>
+  U inline GetSharedFloatRepresentation(void* val, U mask) const;
 };
 
 /// Compares the equality of two Tuples, going slot by slot.
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 40bae14..60ef906 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -667,19 +667,6 @@ class TableDef {
             "equivalent to SORT BY. Please, use the latter, if that was your " +
             "intention."));
       }
-
-      List<? extends Type> notSupportedTypes = Arrays.asList(Type.STRING, Type.VARCHAR,
-          Type.FLOAT, Type.DOUBLE);
-      for (Integer position : colIdxs) {
-        Type colType = columnTypes.get(position);
-
-        if (notSupportedTypes.stream().anyMatch(type -> colType.matchesType(type))) {
-          throw new AnalysisException(String.format("SORT BY ZORDER does not support "
-              + "column types: %s", String.join(", ",
-                  notSupportedTypes.stream().map(type -> type.toString())
-                  .collect(Collectors.toList()))));
-        }
-      }
     }
 
     Preconditions.checkState(numColumns == colIdxs.size());
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index a0faae2..a64b98a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1298,6 +1298,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     AnalyzesOk("alter table functional.alltypes sort by zorder (int_col,id)");
     AnalyzesOk("alter table functional.alltypes sort by zorder (bool_col,int_col,id)");
+    AnalyzesOk(
+      "alter table functional.alltypes sort by zorder (timestamp_col, string_col)");
     AnalyzesOk("alter table functional.alltypes sort by zorder ()");
     AnalysisError("alter table functional.alltypes sort by zorder (id)",
         "SORT BY ZORDER with 1 column is equivalent to SORT BY. Please, use the " +
@@ -1308,9 +1310,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "not find SORT BY column 'foo' in table.");
     AnalysisError("alter table functional_hbase.alltypes sort by zorder (id, foo)",
         "ALTER TABLE SORT BY not supported on HBase tables.");
-    AnalysisError("alter table functional.alltypes sort by zorder (bool_col,string_col)",
-        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
-        "DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
@@ -1971,10 +1970,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip)");
     AnalyzesOk("create table newtbl_DNE like parquet "
         + "'/test-warehouse/schemas/decimal.parquet' sort by zorder (d32, d11)");
-    AnalysisError("create table newtbl_DNE like parquet "
-        + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by  zorder (id,zip)",
-        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
-        "DOUBLE");
     AnalyzesOk("create table if not exists functional.zipcode_incomes like parquet "
         + "'/test-warehouse/schemas/zipcode_incomes.parquet'");
     AnalyzesOk("create table if not exists newtbl_DNE like parquet "
@@ -2350,15 +2345,17 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table tbl sort by (int_col,foo) like functional.alltypes",
         "Could not find SORT BY column 'foo' in table.");
 
-    // Test zsort columns.
+    // Test sort by zorder columns.
     BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
 
     AnalyzesOk("create table tbl sort by zorder (int_col,id) like functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (float_col,id) like functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (double_col,id) like " +
+        "functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (string_col,timestamp_col) like " +
+        "functional.alltypes");
     AnalysisError("create table tbl sort by zorder (int_col,foo) like " +
         "functional.alltypes", "Could not find SORT BY column 'foo' in table.");
-    AnalysisError("create table tbl sort by zorder (string_col,id) like " +
-        "functional.alltypes", "SORT BY ZORDER does not support column types: STRING, " +
-        "VARCHAR(*), FLOAT, DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
@@ -2694,19 +2691,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table functional.new_table (i int) PARTITIONED BY (d decimal)" +
         "sort by zorder (i, d)", "SORT BY column list must not contain partition " +
         "column: 'd'");
-    // For Z-Order, string, varchar, float and double columns are not supported.
-    AnalysisError("create table functional.new_table (i int, s string) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s varchar(32)) sort by " +
-        "zorder (i, s)", "SORT BY ZORDER does not support column types: STRING, " +
-        "VARCHAR(*), FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s double) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s float) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }