You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2020/02/28 23:03:19 UTC

[impala] branch master updated (3fb376b -> 519093f)

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 3fb376b  IMPALA-6689: Speed up point lookup for Kudu primary key
     new c48efd4  IMPALA-9226: Improve string allocations of the ORC scanner
     new 60a9e72  IMPALA-8755: Backend support for Z-ordering
     new 519093f  IMPALA-9389: [DOCS] Support reading zstd text files

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/exchange-node.cc                       |   2 +-
 be/src/exec/hdfs-orc-scanner.cc                    |  21 +-
 be/src/exec/hdfs-orc-scanner.h                     |   7 +
 be/src/exec/hdfs-table-sink.cc                     |   1 +
 be/src/exec/hdfs-table-sink.h                      |   4 +
 be/src/exec/orc-column-readers.cc                  |  83 ++-
 be/src/exec/orc-column-readers.h                   |  66 +-
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  16 +-
 be/src/exec/partial-sort-node.cc                   |   6 +-
 be/src/exec/partial-sort-node.h                    |   2 +
 be/src/exec/sort-node.cc                           |   6 +-
 be/src/exec/sort-node.h                            |   2 +
 be/src/exec/topn-node.cc                           |   2 +-
 be/src/runtime/data-stream-test.cc                 |   2 +-
 be/src/runtime/sorter.cc                           |  31 +-
 be/src/runtime/sorter.h                            |   4 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/tuple-row-compare-test.cc              | 803 +++++++++++++++++++++
 be/src/util/tuple-row-compare.cc                   | 178 ++++-
 be/src/util/tuple-row-compare.h                    | 108 ++-
 docs/topics/impala_file_formats.xml                |   6 +-
 docs/topics/impala_txtfile.xml                     |  74 +-
 .../java/org/apache/impala/analysis/TableDef.java  |  13 -
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  32 +-
 24 files changed, 1294 insertions(+), 177 deletions(-)
 create mode 100644 be/src/util/tuple-row-compare-test.cc


[impala] 02/03: IMPALA-8755: Backend support for Z-ordering

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 60a9e72faf7b8c3e034f4319c0761ed389994707
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Fri Oct 18 15:23:42 2019 +0200

    IMPALA-8755: Backend support for Z-ordering
    
    This change depends on gerrit.cloudera.org/#/c/13955/
    (Frontend support for Z-ordering)
    
    The commit adds a Comparator based on Z-ordering. See in detail:
    https://en.wikipedia.org/wiki/Z-order_curve
    
    The comparator instead of calculating the Z-values of the rows,
    looks for the column with the most significant dimension, and
    compares the values of this column only. The most significant
    dimension will be the one where the compared values have the
    highest different bits. The algorithm requires values of
    the same binary representation, therefore the values are
    converted into either uint32_t, uint63_t or uint128_t, the
    smallest in which all data fits. Comparing smaller types with
    bigger ones would make the bigger type much more dominant
    therefore the bits of these smaller types are shifted up.
    
    All primitive types (including string and floating point types)
    are supported.
    
    Testing:
     * Added unit tests.
     * Run manual tests, comparing 4-column values with 4-bit
       integers, for all possible combinations. Checked the result by
       calculating the Z-value for each comparison.
     * Tested performance on various data, getting great results for
       selective queries. An example: used the TPCH dataset's
       lineitem table with scale 25, where the sorting columns are
       l_partkey and l_suppkey, in that order. Run selective queries
       for the value range of the two columns, for both lexical and
       Z-ordering and compared the percentage of filtered pages and
       row groups. While queries with filters on the first column
       showed almost no difference, queries on the second column
       is in favour of Z-ordering:
       Ordering | Column | Filtered pages % | Filtered row groups %
       Lex.       1st      ~99%               ~90%
       Z-ord.     1st      ~99%               ~89%
       Lex.       2nd      ~25%               0%
       Z-ord.     2nd      ~97%               0%
       The only drawback is the sorting itself, taking ~4 times more
       than lexical sorting (eg. sorting for the dataset above took
       14m for Lexical, and 55m for Z-ordering).
       Note however, that this is a one-time thing to do, sorting
       only happens once, when writing the data.
       Also, lexical ordering is supported by codegen, while it is
       not implemented for Z-ordering yet.
    
    Change-Id: I0200748ce3e65ebc5d3530f794c0f80aa335a2ab
    Reviewed-on: http://gerrit.cloudera.org:8080/14080
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/exchange-node.cc                       |   2 +-
 be/src/exec/hdfs-table-sink.cc                     |   1 +
 be/src/exec/hdfs-table-sink.h                      |   4 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  16 +-
 be/src/exec/partial-sort-node.cc                   |   6 +-
 be/src/exec/partial-sort-node.h                    |   2 +
 be/src/exec/sort-node.cc                           |   6 +-
 be/src/exec/sort-node.h                            |   2 +
 be/src/exec/topn-node.cc                           |   2 +-
 be/src/runtime/data-stream-test.cc                 |   2 +-
 be/src/runtime/sorter.cc                           |  31 +-
 be/src/runtime/sorter.h                            |   4 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/tuple-row-compare-test.cc              | 803 +++++++++++++++++++++
 be/src/util/tuple-row-compare.cc                   | 178 ++++-
 be/src/util/tuple-row-compare.h                    | 108 ++-
 .../java/org/apache/impala/analysis/TableDef.java  |  13 -
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  32 +-
 18 files changed, 1119 insertions(+), 95 deletions(-)

diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 5a59398..a3e0726 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -107,7 +107,7 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   if (is_merging_) {
     less_than_.reset(
-        new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+        new TupleRowLexicalComparator(ordering_exprs_, is_asc_order_, nulls_first_));
     state->CheckAndAddCodegenDisabledMessage(runtime_profile());
   }
   return Status::OK();
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 2674e72..05a00ee 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -82,6 +82,7 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sin
     overwrite_(hdfs_sink.overwrite),
     input_is_clustered_(hdfs_sink.input_is_clustered),
     sort_columns_(hdfs_sink.sort_columns),
+    sorting_order_((TSortingOrder::type)hdfs_sink.sorting_order),
     current_clustered_partition_(nullptr),
     partition_key_exprs_(sink_config.partition_key_exprs_) {
   if (hdfs_sink.__isset.write_id) {
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2e790fd..0ea7009 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -173,6 +173,7 @@ class HdfsTableSink : public DataSink {
 
   int skip_header_line_count() const { return skip_header_line_count_; }
   const vector<int32_t>& sort_columns() const { return sort_columns_; }
+  TSortingOrder::type sorting_order() const { return sorting_order_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
   RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
@@ -291,6 +292,9 @@ class HdfsTableSink : public DataSink {
   // populate the RowGroup::sorting_columns list in parquet files.
   const std::vector<int32_t>& sort_columns_;
 
+  // Represents the sorting order used in SORT BY queries.
+  const TSortingOrder::type sorting_order_;
+
   /// Stores the current partition during clustered inserts across subsequent row batches.
   /// Only set if 'input_is_clustered_' is true.
   PartitionPair* current_clustered_partition_;
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index b6a4391..7225384 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1394,13 +1394,17 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   }
 
   // Populate RowGroup::sorting_columns with all columns specified by the Frontend.
-  for (int col_idx : parent_->sort_columns()) {
-    current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
-    parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
-    sorting_column.column_idx = col_idx;
-    sorting_column.descending = false;
-    sorting_column.nulls_first = false;
+  // Do that only, if the sorting type is lexical.
+  if (parent_->sorting_order() == TSortingOrder::LEXICAL){
+    for (int col_idx : parent_->sort_columns()) {
+      current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
+      parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
+      sorting_column.column_idx = col_idx;
+      sorting_column.descending = false;
+      sorting_column.nulls_first = false;
+    }
   }
+
   current_row_group_->__isset.sorting_columns =
       !current_row_group_->sorting_columns.empty();
 
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index f8cdf8b..1586f46 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -38,6 +38,8 @@ Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  sorting_order_ = static_cast<TSortingOrder::type>(
+      tnode.sort_node.sort_info.sorting_order);
   return Status::OK();
 }
 
@@ -58,6 +60,7 @@ PartialSortNode::PartialSortNode(
   sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
   is_asc_order_ = pnode.is_asc_order_;
   nulls_first_ = pnode.nulls_first_;
+  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Partial");
 }
 
@@ -70,7 +73,8 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false,
+      sorting_order_));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index 12d460a..9cbe988 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -39,6 +39,7 @@ class PartialSortPlanNode : public PlanNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 };
 
 /// Node that implements a partial sort, where its input is divided up into runs, each
@@ -85,6 +86,7 @@ class PartialSortNode : public ExecNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 2efe40e..5943fc0 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -37,6 +37,8 @@ Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
       *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  sorting_order_ = static_cast<TSortingOrder::type>(
+      tnode.sort_node.sort_info.sorting_order);
   return Status::OK();
 }
 
@@ -56,6 +58,7 @@ SortNode::SortNode(
   sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
   is_asc_order_ = pnode.is_asc_order_;
   nulls_first_ = pnode.nulls_first_;
+  sorting_order_ = pnode.sorting_order_;
   runtime_profile()->AddInfoString("SortType", "Total");
 }
 
@@ -67,7 +70,8 @@ Status SortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true,
+      sorting_order_));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index f6fdd52..8de3c01 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -39,6 +39,7 @@ class SortPlanNode : public PlanNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 };
 
 /// Node that implements a full sort of its input with a fixed memory budget, spilling
@@ -82,6 +83,7 @@ class SortNode : public ExecNode {
 
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
+  TSortingOrder::type sorting_order_;
 
   /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch.
   /// Used to avoid unnecessary calls to ReleaseUnusedReservation().
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index b67e03a..7f21316 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -88,7 +88,7 @@ Status TopNNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ScalarExprEvaluator::Create(output_tuple_exprs_, state, pool_,
       expr_perm_pool(), expr_results_pool(), &output_tuple_expr_evals_));
   tuple_row_less_than_.reset(
-      new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+      new TupleRowLexicalComparator(ordering_exprs_, is_asc_order_, nulls_first_));
   output_tuple_desc_ = row_descriptor_.tuple_descriptors()[0];
   insert_batch_timer_ = ADD_TIMER(runtime_profile(), "InsertBatchTime");
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index d8dc832..23720ce 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -346,7 +346,7 @@ class DataStreamTest : public testing::Test {
     SlotRef* lhs_slot = obj_pool_.Add(new SlotRef(TYPE_BIGINT, 0));
     ASSERT_OK(lhs_slot->Init(RowDescriptor(), true, runtime_state_.get()));
     ordering_exprs_.push_back(lhs_slot);
-    less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_,
+    less_than_ = obj_pool_.Add(new TupleRowLexicalComparator(ordering_exprs_,
         is_asc_, nulls_first_));
     ASSERT_OK(less_than_->Open(
         &obj_pool_, runtime_state_.get(), mem_pool_.get(), mem_pool_.get()));
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e2f7a76..947d49d 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -763,12 +763,13 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
     MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
     int64_t page_len, RuntimeProfile* profile, RuntimeState* state,
-    const string& node_label, bool enable_spilling)
+    const string& node_label, bool enable_spilling,
+    TSortingOrder::type sorting_order)
   : node_label_(node_label),
     state_(state),
     expr_perm_pool_(mem_tracker),
     expr_results_pool_(mem_tracker),
-    compare_less_than_(ordering_exprs, is_asc_order, nulls_first),
+    compare_less_than_(nullptr),
     in_mem_tuple_sorter_(nullptr),
     buffer_pool_client_(buffer_pool_client),
     page_len_(page_len),
@@ -784,7 +785,19 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
     num_merges_counter_(nullptr),
     in_mem_sort_timer_(nullptr),
     sorted_data_size_(nullptr),
-    run_sizes_(nullptr) {}
+    run_sizes_(nullptr) {
+  switch (sorting_order) {
+    case TSortingOrder::LEXICAL:
+      compare_less_than_.reset(new TupleRowLexicalComparator(ordering_exprs, is_asc_order,
+          nulls_first));
+      break;
+    case TSortingOrder::ZORDER:
+      compare_less_than_.reset(new TupleRowZOrderComparator(ordering_exprs));
+      break;
+    default:
+      DCHECK(false);
+  }
+}
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
@@ -812,7 +825,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
   in_mem_tuple_sorter_.reset(
-      new TupleSorter(this, compare_less_than_, sort_tuple_desc->byte_size(), state_));
+      new TupleSorter(this, *compare_less_than_, sort_tuple_desc->byte_size(), state_));
 
   if (enable_spilling_) {
     initial_runs_counter_ = ADD_COUNTER(profile_, "InitialRunsCreated", TUnit::UNIT);
@@ -831,13 +844,13 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
 }
 
 Status Sorter::Codegen(RuntimeState* state) {
-  return compare_less_than_.Codegen(state);
+  return compare_less_than_->Codegen(state);
 }
 
 Status Sorter::Open() {
   DCHECK(in_mem_tuple_sorter_ != nullptr) << "Not prepared";
   DCHECK(unsorted_run_ == nullptr) << "Already open";
-  RETURN_IF_ERROR(compare_less_than_.Open(&obj_pool_, state_, &expr_perm_pool_,
+  RETURN_IF_ERROR(compare_less_than_->Open(&obj_pool_, state_, &expr_perm_pool_,
       &expr_results_pool_));
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
   unsorted_run_ = run_pool_.Add(new Run(this, sort_tuple_desc, true));
@@ -927,12 +940,12 @@ void Sorter::Reset() {
   merger_.reset();
   // Free resources from the current runs.
   CleanupAllRuns();
-  compare_less_than_.Close(state_);
+  compare_less_than_->Close(state_);
 }
 
 void Sorter::Close(RuntimeState* state) {
   CleanupAllRuns();
-  compare_less_than_.Close(state);
+  compare_less_than_->Close(state);
   ScalarExprEvaluator::Close(sort_tuple_expr_evals_, state);
   expr_perm_pool_.FreeAll();
   expr_results_pool_.FreeAll();
@@ -1069,7 +1082,7 @@ Status Sorter::CreateMerger(int num_runs) {
   // from the runs being merged. This is unnecessary overhead that is not required if we
   // correctly transfer resources.
   merger_.reset(
-      new SortedRunMerger(compare_less_than_, output_row_desc_, profile_, true));
+      new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true));
 
   vector<function<Status (RowBatch**)>> merge_runs;
   merge_runs.reserve(num_runs);
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 2040b46..cb06fab 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -108,7 +108,7 @@ class Sorter {
       const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
       MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len,
       RuntimeProfile* profile, RuntimeState* state, const std::string& node_label,
-      bool enable_spilling);
+      bool enable_spilling, TSortingOrder::type sorting_order);
   ~Sorter();
 
   /// Initial set-up of the sorter for execution.
@@ -221,7 +221,7 @@ class Sorter {
   MemPool expr_results_pool_;
 
   /// In memory sorter and less-than comparator.
-  TupleRowComparator compare_less_than_;
+  boost::scoped_ptr<TupleRowComparator> compare_less_than_;
   boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;
 
   /// Client used to allocate pages from the buffer pool. Not owned.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 21127cb..c8cd735 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -143,6 +143,7 @@ add_library(UtilTests STATIC
   system-state-info-test.cc
   thread-pool-test.cc
   time-test.cc
+  tuple-row-compare-test.cc
   uid-util-test.cc
   zip-util-test.cc
 )
@@ -207,6 +208,7 @@ ADD_UNIFIED_BE_LSAN_TEST(system-state-info-test "SystemStateInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(sys-info-test "CpuInfoTest.*:DiskInfoTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(thread-pool-test "ThreadPoolTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
 # Using standalone webserver-test for now, nonstandard main() passes in a port.
 ADD_BE_LSAN_TEST(webserver-test)
diff --git a/be/src/util/tuple-row-compare-test.cc b/be/src/util/tuple-row-compare-test.cc
new file mode 100644
index 0000000..2c1b196
--- /dev/null
+++ b/be/src/util/tuple-row-compare-test.cc
@@ -0,0 +1,803 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exprs/slot-ref.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
+#include "runtime/timestamp-value.inline.h"
+#include "runtime/tuple-row.h"
+#include "runtime/types.h"
+#include "testutil/gtest-util.h"
+#include "util/tuple-row-compare.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class TupleRowCompareTest : public testing::Test {
+ public:
+  TupleRowCompareTest() : expr_perm_pool_(&tracker_), expr_results_pool_(&tracker_) {}
+
+ protected:
+  typedef __uint128_t uint128_t;
+  /// Temporary runtime environment for the TupleRowComperator.
+  scoped_ptr<TestEnv> test_env_;
+  RuntimeState* runtime_state_;
+  RowDescriptor desc_;
+
+  ObjectPool pool_;
+  /// A dummy MemTracker used for exprs and other things we don't need to have limits on.
+  MemTracker tracker_;
+  MemPool expr_perm_pool_;
+  MemPool expr_results_pool_;
+  scoped_ptr<TupleRowZOrderComparator> comperator_;
+
+  vector<ScalarExpr*> ordering_exprs_;
+
+  virtual void SetUp() {
+    test_env_.reset(new TestEnv());
+    ASSERT_OK(test_env_->Init());
+  }
+
+  virtual void TearDown() {
+    comperator_->Close(runtime_state_);
+    ScalarExpr::Close(ordering_exprs_);
+
+    runtime_state_ = nullptr;
+    test_env_.reset();
+    expr_perm_pool_.FreeAll();
+    expr_results_pool_.FreeAll();
+    pool_.Clear();
+  }
+
+  void LoadComperator(int offset) {
+    comperator_.reset(new TupleRowZOrderComparator(ordering_exprs_));
+    ASSERT_OK(comperator_->Open(&pool_, runtime_state_, &expr_perm_pool_,
+        &expr_results_pool_));
+  }
+
+  // Only ColumnType should be passed as template parameter.
+  template <typename... Types>
+  void LoadComperator(int offset, ColumnType type, Types... types) {
+    //We are trying to fit into one slot, so the offset has to be < sizeof(int)*8.
+    DCHECK_LT(offset, sizeof(int) * 8) << "Too many columns added.";
+    SlotRef* build_expr = pool_.Add(new SlotRef(type, offset, true /* nullable */));
+    ASSERT_OK(build_expr->Init(desc_, true, nullptr));
+    ordering_exprs_.push_back(build_expr);
+    LoadComperator(offset + type.GetSlotSize(), types...);
+  }
+
+  template <typename... Types>
+  void CreateComperator(Types... types) {
+    ordering_exprs_.clear();
+    LoadComperator(1, types...);
+  }
+
+  template <bool IS_FIRST_SLOT_NULL = false, typename... Args>
+  TupleRow* CreateTupleRow(Args... args) {
+    // Only one null byte is allocated, so the joint slot size has to be < sizeof(int)*8.
+    DCHECK_LE(sizeof...(args), 8);
+    uint8_t* tuple_row_mem = expr_perm_pool_.Allocate(
+        sizeof(char*) + sizeof(int32_t*) * sizeof...(args));
+    Tuple* tuple_mem = Tuple::Create(sizeof(char) + GetSize(args...), &expr_perm_pool_);
+    if (IS_FIRST_SLOT_NULL) tuple_mem->SetNull(NullIndicatorOffset(0, 1));
+    FillMem(tuple_mem, 1, args...);
+    TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    row->SetTuple(0, tuple_mem);
+    return row;
+  }
+
+  unsigned GetSize() { return 0; }
+
+  template <typename T, typename... Args>
+  unsigned GetSize(const T & head, const Args &... tail) {
+      return sizeof(T) + GetSize(tail...);
+  }
+
+  template <typename T>
+  void FillMem(Tuple* tuple_mem, int idx, T val) {
+    *reinterpret_cast<T*>(tuple_mem->GetSlot(idx)) = val;
+  }
+
+  template <typename T, typename... Args>
+  void FillMem(Tuple* tuple_mem, int idx, T val, Args... args) {
+    *reinterpret_cast<T*>(tuple_mem->GetSlot(idx)) = val;
+    FillMem(tuple_mem, idx + sizeof(T), args...);
+  }
+
+  template <typename T, bool IS_FIRST_SLOT_NULL = false>
+  int CompareTest(T lval1, T lval2, T rval1, T rval2) {
+    TupleRow* lhs = CreateTupleRow<IS_FIRST_SLOT_NULL>(lval1, lval2);
+    TupleRow* rhs = CreateTupleRow(rval1, rval2);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  // With this function, nullable entries can also be tested, by setting the
+  // template parameter to true. This means that the first slot of the
+  // left hand side is set to null, which should be equal to the
+  // smallest possible value when comparing.
+  template <bool IS_FIRST_SLOT_NULL = false>
+  int IntIntTest(int32_t lval1, int32_t lval2, int32_t rval1, int32_t rval2) {
+    CreateComperator(ColumnType(TYPE_INT), ColumnType(TYPE_INT));
+    return CompareTest<int32_t, IS_FIRST_SLOT_NULL>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int64Int64Test(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2) {
+    CreateComperator(ColumnType(TYPE_BIGINT), ColumnType(TYPE_BIGINT));
+    return CompareTest<int64_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int16Int16Test(int16_t lval1, int16_t lval2, int16_t rval1, int16_t rval2) {
+    CreateComperator(ColumnType(TYPE_SMALLINT), ColumnType(TYPE_SMALLINT));
+    return CompareTest<int16_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int Int8Int8Test(int8_t lval1, int8_t lval2, int8_t rval1, int8_t rval2) {
+    CreateComperator(ColumnType(TYPE_TINYINT), ColumnType(TYPE_TINYINT));
+    return CompareTest<int8_t>(lval1, lval2, rval1, rval2);
+  }
+
+  int FloatFloatTest(float lval1, float lval2, float rval1, float rval2) {
+    CreateComperator(ColumnType(TYPE_FLOAT), ColumnType(TYPE_FLOAT));
+    return CompareTest<float>(lval1, lval2, rval1, rval2);
+  }
+
+  int DoubleDoubleTest(double lval1, double lval2, double rval1, double rval2) {
+    CreateComperator(ColumnType(TYPE_DOUBLE), ColumnType(TYPE_DOUBLE));
+    return CompareTest<double>(lval1, lval2, rval1, rval2);
+  }
+
+  int BoolBoolTest(bool lval1, bool lval2, bool rval1, bool rval2) {
+    CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_BOOLEAN));
+    return CompareTest<bool>(lval1, lval2, rval1, rval2);
+  }
+
+  // Char is a special case, so its tuples have to be created differently.
+  // This function is responsible for only the char test below, therefore
+  // the tuple will have a fix size of two slots.
+  TupleRow* CreateCharArrayTupleRow(const char* ptr1, const char* ptr2) {
+    uint8_t* tuple_row_mem = expr_perm_pool_.Allocate(
+        sizeof(char*) + sizeof(int32_t*) * 2);
+    Tuple* tuple_mem =
+        Tuple::Create(sizeof(char) + strlen(ptr1) + strlen(ptr2), &expr_perm_pool_);
+    memcpy(tuple_mem->GetSlot(1), ptr1, strlen(ptr1));
+    memcpy(tuple_mem->GetSlot(1 + strlen(ptr1)), ptr2, strlen(ptr2));
+    TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    row->SetTuple(0, tuple_mem);
+    return row;
+  }
+
+  int CharCharTest(const char *lval1, const char *lval2, const char *rval1,
+      const char *rval2) {
+    DCHECK_EQ(strlen(lval1), strlen(rval1));
+    DCHECK_EQ(strlen(lval2), strlen(rval2));
+    CreateComperator(ColumnType::CreateCharType(strlen(lval1)),
+        ColumnType::CreateCharType(strlen(lval2)));
+
+    TupleRow* lhs = CreateCharArrayTupleRow(lval1, lval2);
+    TupleRow* rhs = CreateCharArrayTupleRow(rval1, rval2);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  int DateDateTest(DateValue lval1, DateValue lval2, DateValue rval1, DateValue rval2) {
+    CreateComperator(ColumnType(TYPE_DATE), ColumnType(TYPE_DATE));
+    return CompareTest<DateValue>(lval1, lval2, rval1, rval2);
+  }
+
+  int TimestampTimestampTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    return TimestampTimestampTest(
+        TimestampValue::ParseSimpleDateFormat(lval1),
+        TimestampValue::ParseSimpleDateFormat(lval2),
+        TimestampValue::ParseSimpleDateFormat(rval1),
+        TimestampValue::ParseSimpleDateFormat(rval2));
+  }
+
+  int TimestampTimestampTest(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2) {
+    return TimestampTimestampTest(TimestampValue::FromDaysSinceUnixEpoch(lval1),
+        TimestampValue::FromDaysSinceUnixEpoch(lval2),
+        TimestampValue::FromDaysSinceUnixEpoch(rval1),
+        TimestampValue::FromDaysSinceUnixEpoch(rval2));
+  }
+
+  int TimestampTimestampTest(TimestampValue lval1, TimestampValue lval2,
+      TimestampValue rval1, TimestampValue rval2) {
+    CreateComperator(ColumnType(TYPE_TIMESTAMP), ColumnType(TYPE_TIMESTAMP));
+    return CompareTest<TimestampValue>(lval1, lval2, rval1, rval2);
+  }
+
+  template<typename DECIMAL_T>
+  int DecimalDecimalTest(int64_t lval1, int64_t lval2, int64_t rval1, int64_t rval2,
+      int precision, int scale) {
+    ColumnType decimal_column = ColumnType::CreateDecimalType(precision, scale);
+    CreateComperator(decimal_column, decimal_column);
+    bool overflow = false;
+    TupleRow* lhs = CreateTupleRow(DECIMAL_T::FromInt(precision, scale, lval1, &overflow),
+        DECIMAL_T::FromInt(precision, scale, lval2, &overflow));
+    TupleRow* rhs = CreateTupleRow(DECIMAL_T::FromInt(precision, scale, rval1, &overflow),
+        DECIMAL_T::FromInt(precision, scale, rval2, &overflow));
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  // The number of bytes to compare in a string depends on the size of the types present
+  // in a row. If there are only strings present, only the first 4 bytes will be
+  // considered. This is decided in TupleRowZOrderComparator::CompareInterpreted, where
+  // the GetByteSize returns 0 for strings.
+  int StringString4ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    CreateComperator(ColumnType(TYPE_STRING), ColumnType(TYPE_STRING));
+    return CompareTest<StringValue>(StringValue(lval1), StringValue(lval2),
+        StringValue(rval1), StringValue(rval2));
+  }
+
+  // Requires that the comparator has already been created with the two String/Varchar
+  // columns and a column corresponding to T.
+  template<typename T>
+  int GenericStringTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2, T dummyValue)  {
+    TupleRow* lhs = CreateTupleRow(StringValue(lval1), StringValue(lval2), dummyValue);
+    TupleRow* rhs = CreateTupleRow(StringValue(rval1), StringValue(rval2), dummyValue);
+    int result = comperator_->Compare(lhs, rhs);
+    comperator_->Close(runtime_state_);
+    return result;
+  }
+
+  int StringString8ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // TYPE_BIGINT will enforce checking the first 8 bytes of the strings.
+    CreateComperator(
+        ColumnType(TYPE_STRING), ColumnType(TYPE_STRING), ColumnType(TYPE_BIGINT));
+    int64_t dummyValue = 0;
+    return GenericStringTest<int64_t>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int StringString16ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // The Decimal16Value column will enforce checking the first 16 bytes of the strings.
+    int precision = ColumnType::MAX_PRECISION;
+    int scale = 0;
+    bool overflow = false;
+    Decimal16Value dummyValue = Decimal16Value::FromInt(precision, scale, 0, &overflow);
+    CreateComperator(ColumnType(TYPE_STRING), ColumnType(TYPE_STRING),
+        ColumnType::CreateDecimalType(precision, scale));
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int VarcharVarchar4ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32));
+    return CompareTest<StringValue>(StringValue(lval1), StringValue(lval2),
+        StringValue(rval1), StringValue(rval2));
+  }
+
+  int VarcharVarchar8ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // TYPE_BIGINT will enforce checking the first 8 bytes of the strings.
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32), ColumnType(TYPE_BIGINT));
+    int64_t dummyValue = 0;
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+
+  int VarcharVarchar16ByteTest(const std::string& lval1, const std::string& lval2,
+      const std::string& rval1, const std::string& rval2) {
+    // The Decimal16Value column will enforce checking the first 16 bytes of the strings.
+    int precision = ColumnType::MAX_PRECISION;
+    int scale = 0;
+    bool overflow = false;
+    Decimal16Value dummyValue = Decimal16Value::FromInt(precision, scale, 0, &overflow);
+    CreateComperator(ColumnType::CreateVarcharType(32),
+        ColumnType::CreateVarcharType(32),
+        ColumnType::CreateDecimalType(precision, scale));
+    return GenericStringTest<Decimal16Value>(lval1, lval2, rval1, rval2, dummyValue);
+  }
+};
+
+// The Z-values used and their order are visualized in the following image:
+// https://en.wikipedia.org/wiki/File:Z-curve.svg
+TEST_F(TupleRowCompareTest, Int32Test) {
+  EXPECT_EQ(IntIntTest(0, 0, 0, 0), 0);
+  EXPECT_EQ(IntIntTest(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(IntIntTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(IntIntTest(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(IntIntTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(IntIntTest(2, 4, 1, 7), 1);
+  EXPECT_EQ(IntIntTest(3, 7, 4, 0), -1);
+  EXPECT_EQ(IntIntTest(6, 4, 5, 7), 1);
+  EXPECT_EQ(IntIntTest(5, 5, 6, 4), -1);
+  EXPECT_EQ(IntIntTest(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(IntIntTest(INT32_MAX / 2 + 2, 1, 1, INT32_MAX), 1);
+  EXPECT_EQ(IntIntTest(INT32_MAX / 2, 1, 1, INT32_MAX), -1);
+
+  // Some null tests (see details at IntIntTest)
+  EXPECT_EQ(IntIntTest<true>(1, 1, 1, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(4242, 1, 1, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(1, 0, 0, 1), -1);
+  EXPECT_EQ(IntIntTest<true>(1, 0, INT32_MIN, 0), 0);
+}
+
+TEST_F(TupleRowCompareTest, Int64Test) {
+  EXPECT_EQ(Int64Int64Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int64Int64Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int64Int64Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int64Int64Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int64Int64Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int64Int64Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int64Int64Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int64Int64Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int64Int64Test(INT64_MAX / 2 + 2, 1, 1, INT64_MAX), 1);
+  EXPECT_EQ(Int64Int64Test(INT64_MAX / 2, 1, 1, INT64_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, Int16Test) {
+  EXPECT_EQ(Int16Int16Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int16Int16Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int16Int16Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int64Int64Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int16Int16Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int16Int16Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int16Int16Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int16Int16Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int16Int16Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int16Int16Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int16Int16Test(INT16_MAX / 2 + 2, 1, 1, INT16_MAX), 1);
+  EXPECT_EQ(Int16Int16Test(INT16_MAX / 2, 1, 1, INT16_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, Int8Test) {
+  EXPECT_EQ(Int8Int8Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int8Int8Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int8Int8Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int8Int8Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int8Int8Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int8Int8Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2 + 2, 1, 1, INT8_MAX), 1);
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2, 1, 1, INT8_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, FloatTest) {
+  EXPECT_EQ(FloatFloatTest(1.0f, 0.0f, 0.0f, 1.0f), 1);
+  EXPECT_EQ(FloatFloatTest(0.0f, 1.0f, 1.0f, 0.0f), -1);
+
+  EXPECT_EQ(FloatFloatTest(4.0f, 3.0f, 3.0f, 4.0f), 1);
+  EXPECT_EQ(FloatFloatTest(5.0f, 7.0f, 4.0f, 10.0f), -1);
+  EXPECT_EQ(FloatFloatTest(6.0f, 10.0f, 7.0f, 3.0f), 1);
+  EXPECT_EQ(FloatFloatTest(9.0f, 7.0f, 8.0f, 10.0f), -1);
+  EXPECT_EQ(FloatFloatTest(8.0f , 8.0f, 9.0f, 7.0f), 1);
+  EXPECT_EQ(FloatFloatTest(9.0f, 4.0f, 6.0f, 10.0f), 1);
+
+  EXPECT_EQ(FloatFloatTest(-4.0f, -3.0f, -3.0f, -4.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-5.0f, -7.0f, -4.0f, -10.0f), 1);
+  EXPECT_EQ(FloatFloatTest(-6.0f, -10.0f, -7.0f, -3.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-9.0f, -7.0f, -8.0f, -10.0f), 1);
+  EXPECT_EQ(FloatFloatTest(-8.0f, -8.0f, -9.0f, -7.0f), -1);
+  EXPECT_EQ(FloatFloatTest(-9.0f, -4.0f, -6.0f, -10.0f), -1);
+
+  EXPECT_EQ(FloatFloatTest(FLT_MAX / 2.0f + 2.0f, 1.0f, 1.0f, FLT_MAX), 1);
+}
+
+TEST_F(TupleRowCompareTest, DoubleTest) {
+  EXPECT_EQ(DoubleDoubleTest(1.0, 0.0, 0.0, 1.0f), 1);
+  EXPECT_EQ(DoubleDoubleTest(0.0, 1.0, 1.0, 0.0f), -1);
+
+  EXPECT_EQ(DoubleDoubleTest(4.0, 3.0, 3.0, 4.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(5.0, 7.0, 4.0, 10.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(6.0, 10.0, 7.0, 3.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(9.0, 7.0, 8.0, 10.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(8.0, 8.0, 9.0, 7.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(9.0, 4.0, 6.0, 10.0), 1);
+
+  EXPECT_EQ(DoubleDoubleTest(-4.0, -3.0, -3.0, -4.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-5.0, -7.0, -4.0, -10.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(-6.0, -10.0, -7.0, -3.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-9.0, -7.0, -8.0, -10.0), 1);
+  EXPECT_EQ(DoubleDoubleTest(-8.0, -8.0, -9.0, -7.0), -1);
+  EXPECT_EQ(DoubleDoubleTest(-9.0, -4.0, -6.0, -10.0), -1);
+
+  EXPECT_EQ(DoubleDoubleTest(DBL_MAX / 2.0 + 2.0, 1.0, 1.0, DBL_MAX), 1);
+}
+
+TEST_F(TupleRowCompareTest, BoolTest) {
+  EXPECT_EQ(BoolBoolTest(true, false, true, false), 0);
+  EXPECT_EQ(BoolBoolTest(false, true, false, true), 0);
+
+  EXPECT_EQ(BoolBoolTest(true, true, true, false), 1);
+  EXPECT_EQ(BoolBoolTest(false, true, true, true), -1);
+  EXPECT_EQ(BoolBoolTest(false, true, false, false), 1);
+  EXPECT_EQ(BoolBoolTest(false, false, false, true), -1);
+  EXPECT_EQ(BoolBoolTest(true, false, false, false), 1);
+}
+
+TEST_F(TupleRowCompareTest, CharTest) {
+  EXPECT_EQ(CharCharTest("a", "b", "a", "b"), 0);
+  EXPECT_EQ(CharCharTest("a", "b", "a", "b"), 0);
+  EXPECT_EQ(CharCharTest("h", "0", "h", "0"), 0);
+
+  EXPECT_EQ(CharCharTest("h", "z", "z", "h"), -1);
+  EXPECT_EQ(CharCharTest("a", "0", "h", "0"), -1);
+  EXPECT_EQ(CharCharTest("!", "{", "0", "K"), 1);
+  EXPECT_EQ(CharCharTest("A", "~", "B", "Z"), 1);
+
+  EXPECT_EQ(CharCharTest("aaa", "bbb", "aaa", "bbb"), 0);
+  EXPECT_EQ(CharCharTest("abc", "bbc", "abc", "bbc"), 0);
+  EXPECT_EQ(CharCharTest("aah", "aa0", "aah", "aa0"), 0);
+
+  EXPECT_EQ(CharCharTest("aaa", "aa0", "aah", "aa0"), -1);
+  EXPECT_EQ(CharCharTest("aah", "aaz", "aaz", "aah"), -1);
+  EXPECT_EQ(CharCharTest("aa!", "aa{", "aa0", "aaK"), 1);
+  EXPECT_EQ(CharCharTest("aaA", "aa~", "aaB", "aaZ"), 1);
+}
+
+TEST_F(TupleRowCompareTest, DateTest) {
+  EXPECT_EQ(Int8Int8Test(0, 0, 0, 0), 0);
+  EXPECT_EQ(Int8Int8Test(-5, 3, -5, 3), 0);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(Int8Int8Test(1, 0, 0, 1), 1);
+  EXPECT_EQ(Int8Int8Test(2, 4, 1, 7), 1);
+  EXPECT_EQ(Int8Int8Test(3, 7, 4, 0), -1);
+  EXPECT_EQ(Int8Int8Test(6, 4, 5, 7), 1);
+  EXPECT_EQ(Int8Int8Test(5, 5, 6, 4), -1);
+  EXPECT_EQ(Int8Int8Test(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2 + 2, 1, 1, INT8_MAX), 1);
+  EXPECT_EQ(Int8Int8Test(INT8_MAX / 2, 1, 1, INT8_MAX), -1);
+}
+
+TEST_F(TupleRowCompareTest, TimestampTest) {
+  EXPECT_EQ(TimestampTimestampTest(
+      "2015-04-09 14:07:46.580465000", "2015-04-09 14:07:46.580465000",
+      "2015-04-09 14:07:46.580465000", "2015-04-09 14:07:46.580465000"), 0);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1415-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000",
+      "1415-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000"), 0);
+
+  EXPECT_EQ(TimestampTimestampTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(TimestampTimestampTest(0, 1, 1, 0), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(1, 0, 0, 1), 1);
+  EXPECT_EQ(TimestampTimestampTest(2, 4, 1, 7), 1);
+  EXPECT_EQ(TimestampTimestampTest(3, 7, 4, 0), -1);
+  EXPECT_EQ(TimestampTimestampTest(6, 4, 5, 7), 1);
+  EXPECT_EQ(TimestampTimestampTest(5, 5, 6, 4), -1);
+  EXPECT_EQ(TimestampTimestampTest(6, 1, 3, 7), 1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000000", "9999-12-31 14:07:46.580465000",
+      "8000-12-09 10:07:44.314159265", "2015-04-09 14:07:46.580465000"), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000001", "1400-01-01 00:00:00.000000000",
+      "1400-01-01 00:00:00.000000000", "1400-01-01 00:00:00.000000001"), 1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000003", "1400-01-01 00:00:00.000000007",
+      "1400-01-01 00:00:00.000000004", "1400-01-01 00:00:00.000000000"), -1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000006", "1400-01-01 00:00:00.000000004",
+      "1400-01-01 00:00:00.000000005", "1400-01-01 00:00:00.000000007"), 1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 00:00:00.000000005", "1400-01-01 00:00:00.000000005",
+      "1400-01-01 00:00:00.000000006", "1400-01-01 00:00:00.000000004"), -1);
+
+  EXPECT_EQ(TimestampTimestampTest(
+      "1400-01-01 23:59:59.999999999", "1400-01-01 00:00:00.000000000",
+      "1400-01-02 00:00:00.000000000", "1400-01-01 00:00:00.000000000"), -1);
+  EXPECT_EQ(TimestampTimestampTest(
+      "3541-11-03 23:59:59.999999999", "3541-11-03 00:00:00.000000000",
+      "3541-11-04 00:00:00.000000000", "3541-11-03 00:00:00.000000000"), -1);
+}
+
+
+TEST_F(TupleRowCompareTest, DecimalTest) {
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(256, 10, 255, 100, 4, 2), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(3, 1024, 128, 1023, 9, 1), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(1024, 511, 1023, 0, 5, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal4Value>(5550, 0, 5000, 4097, 9, 3), -1);
+
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(256, 10, 255, 100, 18, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(3, 1024, 128, 1023, 18, 1), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(1024, 511, 1023, 0, 18, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal8Value>(5550, 0, 5000, 4097, 18, 3), -1);
+
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1, 1, 1, 1, 4, 2), 0);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(-5, 3, -5, 3, 2, 1), 0);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1, 0, 0, 1, 1, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(0, 1, 1, 0, 1, 0), -1);
+
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(256, 10, 255, 100,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(3, 1024, 128, 1023,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(1024, 511, 1023, 0,
+      ColumnType::MAX_PRECISION, 0), 1);
+  EXPECT_EQ(DecimalDecimalTest<Decimal16Value>(5550, 0, 5000, 4097,
+      ColumnType::MAX_PRECISION, 0), -1);
+}
+
+TEST_F(TupleRowCompareTest, AllTypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_TIMESTAMP), ColumnType(TYPE_BOOLEAN),
+      ColumnType(TYPE_TINYINT), ColumnType(TYPE_BIGINT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000002"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000001"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000002"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000001"),
+      false, static_cast<int8_t>(42), static_cast<int64_t>(1), '~',
+      Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  // Checking the dominance of types (bigger types should not dominate smaller ones)
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      true, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 23:59:59.999999999"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(1) << 5, '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 23:59:59.999999999"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(0), static_cast<int64_t>(1) << 5, '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  rhs = CreateTupleRow(
+      TimestampValue::ParseSimpleDateFormat("1400-01-01 00:00:00.000000000"),
+      false, static_cast<int8_t>(1), static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+// Without converting into 128-bit representation, only 64-bit comparison
+TEST_F(TupleRowCompareTest, All64TypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_TINYINT),
+      ColumnType(TYPE_SMALLINT), ColumnType(TYPE_INT),
+      ColumnType(TYPE_BIGINT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      static_cast<int64_t>(0), '~',
+      Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), 'z',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), 'a',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), '~',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(56), static_cast<int32_t>(1),
+        static_cast<int64_t>(1), '~',
+        Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+// Without converting into 64-bit representation, only 32-bit comparison
+TEST_F(TupleRowCompareTest, All32TypeTest)  {
+  int precision = 9, scale = 0;
+  bool overflow = false;
+  CreateComperator(ColumnType(TYPE_BOOLEAN), ColumnType(TYPE_TINYINT),
+      ColumnType(TYPE_SMALLINT), ColumnType(TYPE_INT), ColumnType::CreateCharType(1),
+      ColumnType(ColumnType::CreateDecimalType(precision, scale)));
+
+  TupleRow* lhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      '~', Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  TupleRow* rhs = CreateTupleRow(
+      false, static_cast<int8_t>(0), static_cast<int16_t>(0), static_cast<int32_t>(0),
+      '~', Decimal4Value::FromInt(precision, scale, 0, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 0);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 312, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), 1);
+
+  lhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(1),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  rhs = CreateTupleRow(
+        false, static_cast<int8_t>(1), static_cast<int16_t>(1), static_cast<int32_t>(56),
+        '~', Decimal4Value::FromInt(precision, scale, 1, &overflow));
+  EXPECT_EQ(comperator_->Compare(lhs, rhs), -1);
+
+  comperator_->Close(runtime_state_);
+}
+
+TEST_F(TupleRowCompareTest, StringTest) {
+  EXPECT_EQ(StringString4ByteTest("hello", "hello", "hello", "hello"), 0);
+  EXPECT_EQ(StringString4ByteTest(std::string(255, 'a'), std::string(255, 'a'),
+      std::string(255, 'a'), std::string(255, 'a')), 0);
+  EXPECT_EQ(StringString4ByteTest(std::string(2550, 'a'), std::string(2550, 'a'),
+      std::string(2550, 'a'), std::string(2550, 'a')), 0);
+
+  EXPECT_EQ(StringString4ByteTest("hello1", "hello2", "hello3", "hello4"), 0);
+  EXPECT_EQ(StringString4ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString4ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString4ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString4ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString4ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(StringString8ByteTest("hello1", "hello2", "hello3", "hello4"), -1);
+  EXPECT_EQ(StringString8ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), 0);
+  EXPECT_EQ(StringString8ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(StringString8ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString8ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString8ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString8ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString8ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(StringString16ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), -1);
+  EXPECT_EQ(StringString16ByteTest("1234567812345678a", "1234567812345678b",
+      "1234567812345678c", "1234567812345678d"), 0);
+  EXPECT_EQ(StringString16ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(StringString16ByteTest("aa", "1234567812345678",
+      "aa", "1234567812345679"), -1);
+  EXPECT_EQ(StringString16ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(StringString16ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(StringString16ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(StringString16ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(StringString16ByteTest("zz", "ydz", "a", "caaa"), 1);
+}
+
+TEST_F(TupleRowCompareTest, VarcharTest) {
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello", "hello", "hello", "hello"), 0);
+
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello", "hello", "hello", "hello"), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest(std::string(255, 'a'), std::string(255, 'a'),
+      std::string(255, 'a'), std::string(255, 'a')), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest(std::string(2550, 'a'), std::string(2550, 'a'),
+      std::string(2550, 'a'), std::string(2550, 'a')), 0);
+
+  EXPECT_EQ(VarcharVarchar4ByteTest("hello1", "hello2", "hello3", "hello4"), 0);
+  EXPECT_EQ(VarcharVarchar4ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar4ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(VarcharVarchar8ByteTest("hello1", "hello2", "hello3", "hello4"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), 0);
+  EXPECT_EQ(VarcharVarchar8ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar8ByteTest("zz", "ydz", "a", "caaa"), 1);
+
+  EXPECT_EQ(VarcharVarchar16ByteTest("12345678a", "12345678b",
+      "12345678c", "12345678d"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("1234567812345678a", "1234567812345678b",
+      "1234567812345678c", "1234567812345678d"), 0);
+  EXPECT_EQ(VarcharVarchar16ByteTest("aa", "bbbbbbbb", "aa", "bbbbbbba"), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("aa", "1234567812345678",
+      "aa", "1234567812345679"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("2", "h", "2", "h2"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("", "h", "", ""), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("ab", "cd", "aw", "ca"), -1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("ab", "yd", "ac", "ca"), 1);
+  EXPECT_EQ(VarcharVarchar16ByteTest("zz", "ydz", "a", "caaa"), 1);
+}
+
+} //namespace impala
+
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index f424eb4..494b507 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -24,7 +24,9 @@
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/runtime-state.h"
+#include "runtime/multi-precision.h"
 #include "util/runtime-profile-counters.h"
+#include "util/bit-util.h"
 
 using namespace impala;
 using namespace strings;
@@ -50,7 +52,17 @@ void TupleRowComparator::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(ordering_expr_evals_lhs_, state);
 }
 
-int TupleRowComparator::CompareInterpreted(
+Status TupleRowComparator::Codegen(RuntimeState* state) {
+  llvm::Function* fn;
+  LlvmCodeGen* codegen = state->codegen();
+  DCHECK(codegen != NULL);
+  RETURN_IF_ERROR(CodegenCompare(codegen, &fn));
+  codegend_compare_fn_ = state->obj_pool()->Add(new CompareFn);
+  codegen->AddFunctionToJit(fn, reinterpret_cast<void**>(codegend_compare_fn_));
+  return Status::OK();
+}
+
+int TupleRowLexicalComparator::CompareInterpreted(
     const TupleRow* lhs, const TupleRow* rhs) const {
   DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size());
   DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size());
@@ -71,16 +83,6 @@ int TupleRowComparator::CompareInterpreted(
   return 0; // fully equivalent key
 }
 
-Status TupleRowComparator::Codegen(RuntimeState* state) {
-  llvm::Function* fn;
-  LlvmCodeGen* codegen = state->codegen();
-  DCHECK(codegen != NULL);
-  RETURN_IF_ERROR(CodegenCompare(codegen, &fn));
-  codegend_compare_fn_ = state->obj_pool()->Add(new CompareFn);
-  codegen->AddFunctionToJit(fn, reinterpret_cast<void**>(codegend_compare_fn_));
-  return Status::OK();
-}
-
 // Codegens an unrolled version of Compare(). Uses codegen'd key exprs and injects
 // nulls_first_ and is_asc_ values.
 //
@@ -202,7 +204,8 @@ Status TupleRowComparator::Codegen(RuntimeState* state) {
 // next_key2:                                        ; preds = %rhs_non_null12, %next_key
 //   ret i32 0
 // }
-Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) {
+Status TupleRowLexicalComparator::CodegenCompare(LlvmCodeGen* codegen,
+    llvm::Function** fn) {
   llvm::LLVMContext& context = codegen->context();
   const vector<ScalarExpr*>& ordering_exprs = ordering_exprs_;
   llvm::Function* key_fns[ordering_exprs.size()];
@@ -307,3 +310,154 @@ Status TupleRowComparator::CodegenCompare(LlvmCodeGen* codegen, llvm::Function**
   }
   return Status::OK();
 }
+
+int TupleRowZOrderComparator::CompareInterpreted(const TupleRow* lhs,
+    const TupleRow* rhs) const {
+  DCHECK_EQ(ordering_exprs_.size(), ordering_expr_evals_lhs_.size());
+  DCHECK_EQ(ordering_expr_evals_lhs_.size(), ordering_expr_evals_rhs_.size());
+
+  // The algorithm requires all values having a common type, without loss of data.
+  // This means we have to find the biggest type.
+  int max_size = ordering_exprs_[0]->type().GetByteSize();
+  for (int i = 1; i < ordering_exprs_.size(); ++i) {
+    if (ordering_exprs_[i]->type().GetByteSize() > max_size) {
+      max_size = ordering_exprs_[i]->type().GetByteSize();
+    }
+  }
+  if (max_size <= 4) {
+    return CompareBasedOnSize<uint32_t>(lhs, rhs);
+  } else if (max_size <= 8) {
+    return CompareBasedOnSize<uint64_t>(lhs, rhs);
+  } else {
+    return CompareBasedOnSize<uint128_t>(lhs, rhs);
+  }
+}
+
+template<typename U>
+int TupleRowZOrderComparator::CompareBasedOnSize(const TupleRow* lhs,
+    const TupleRow* rhs) const {
+  auto less_msb = [](U x, U y) { return x < y && x < (x ^ y); };
+  ColumnType type = ordering_exprs_[0]->type();
+  // Values of the most significant dimension from both sides.
+  U msd_lhs = GetSharedRepresentation<U>(ordering_expr_evals_lhs_[0]->GetValue(lhs),
+      type);
+  U msd_rhs = GetSharedRepresentation<U>(ordering_expr_evals_rhs_[0]->GetValue(rhs),
+      type);
+  for (int i = 1; i < ordering_exprs_.size(); ++i) {
+    type = ordering_exprs_[i]->type();
+    void* lhs_v = ordering_expr_evals_lhs_[i]->GetValue(lhs);
+    void* rhs_v = ordering_expr_evals_rhs_[i]->GetValue(rhs);
+
+    U lhsi = GetSharedRepresentation<U>(lhs_v, type);
+    U rhsi = GetSharedRepresentation<U>(rhs_v, type);
+
+    if (less_msb(msd_lhs ^ msd_rhs, lhsi ^ rhsi)) {
+      msd_lhs = lhsi;
+      msd_rhs = rhsi;
+    }
+  }
+  return msd_lhs < msd_rhs ? -1 : (msd_lhs > msd_rhs ? 1 : 0);
+}
+
+template <typename U>
+U TupleRowZOrderComparator::GetSharedRepresentation(void* val, ColumnType type) const {
+  // The mask used for setting the sign bit correctly.
+  if (val == NULL) return 0;
+  constexpr U mask = (U)1 << (sizeof(U) * 8 - 1);
+  switch (type.type) {
+    case TYPE_NULL:
+      return 0;
+    case TYPE_BOOLEAN:
+      return static_cast<U>(*reinterpret_cast<const bool*>(val)) << (sizeof(U) * 8 - 1);
+    case TYPE_TINYINT:
+      return GetSharedIntRepresentation<U, int8_t>(
+          *reinterpret_cast<const int8_t*>(val), mask);
+    case TYPE_SMALLINT:
+      return GetSharedIntRepresentation<U, int16_t>(
+          *reinterpret_cast<const int16_t*>(val), mask);
+    case TYPE_INT:
+      return GetSharedIntRepresentation<U, int32_t>(
+          *reinterpret_cast<const int32_t*>(val), mask);
+    case TYPE_BIGINT:
+      return GetSharedIntRepresentation<U, int64_t>(
+          *reinterpret_cast<const int64_t*>(val), mask);
+    case TYPE_DATE:
+      return GetSharedIntRepresentation<U, int32_t>(
+          reinterpret_cast<const DateValue*>(val)->Value(), mask);
+    case TYPE_FLOAT:
+      return GetSharedFloatRepresentation<U, float>(val, mask);
+    case TYPE_DOUBLE:
+      return GetSharedFloatRepresentation<U, double>(val, mask);
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+      const StringValue* string_value = reinterpret_cast<const StringValue*>(val);
+      return GetSharedStringRepresentation<U>(string_value->ptr, string_value->len);
+    }
+    case TYPE_CHAR:
+      return GetSharedStringRepresentation<U>(
+          reinterpret_cast<const char*>(val), type.len);
+    case TYPE_TIMESTAMP: {
+      const TimestampValue* ts = reinterpret_cast<const TimestampValue*>(val);
+      const uint128_t nanosnds = static_cast<uint128_t>(ts->time().total_nanoseconds());
+      const uint128_t days = static_cast<uint128_t>(ts->date().day_number());
+      return (days << 64) | nanosnds;
+    }
+    case TYPE_DECIMAL:
+      switch (type.GetByteSize()) {
+        case 4:
+          return GetSharedIntRepresentation<U, int32_t>(
+              reinterpret_cast<const Decimal4Value*>(val)->value(), mask);
+        case 8:
+          return GetSharedIntRepresentation<U, int64_t>(
+              reinterpret_cast<const Decimal8Value*>(val)->value(), mask);
+        case 16: // value is of int128_t, big enough that no shifts are needed
+          return static_cast<U>(
+              reinterpret_cast<const Decimal16Value*>(val)->value()) ^ mask;
+        default:
+          DCHECK(false) << type;
+          return 0;
+      }
+    default:
+      return 0;
+  }
+}
+
+template <typename U, typename T>
+U inline TupleRowZOrderComparator::GetSharedIntRepresentation(const T val, U mask) const {
+  return (static_cast<U>(val) <<
+      std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0)) ^ mask;
+}
+
+template <typename U, typename T>
+U inline TupleRowZOrderComparator::GetSharedFloatRepresentation(void* val, U mask) const {
+  int64_t tmp;
+  T floating_value = *reinterpret_cast<const T*>(val);
+  memcpy(&tmp, &floating_value, sizeof(T));
+  if (UNLIKELY(std::isnan(floating_value))) return 0;
+  if (floating_value < 0.0) {
+    // Flipping all bits for negative values.
+    return static_cast<U>(~tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0);
+  } else {
+    // Flipping only first bit.
+    return (static_cast<U>(tmp) << std::max((sizeof(U) - sizeof(T)) * 8, (uint64_t)0)) ^
+        mask;
+  }
+}
+
+template <typename U>
+U inline TupleRowZOrderComparator::GetSharedStringRepresentation(const char* char_ptr,
+    int length) const {
+  int len = length < sizeof(U) ? length : sizeof(U);
+  if (len == 0) return 0;
+  U dst = 0;
+  // We copy the bytes from the string but swap the bytes because of integer endianness.
+  BitUtil::ByteSwap(&dst, char_ptr, len);
+  return dst << ((sizeof(U) - len) * 8);
+}
+
+Status TupleRowZOrderComparator::CodegenCompare(LlvmCodeGen* codegen,
+    llvm::Function** fn) {
+  LOG(WARNING) << "TupleRowZOrderComparator::CodegenCompare is not yet implemented.";
+  return Status("TupleRowZOrderComparator has no Codegen'd comperator.");
+}
+
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index e1933d5..fdaa1ed 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -57,22 +57,14 @@ class ComparatorWrapper {
   }
 };
 
-/// Compares two TupleRows based on a set of exprs, in order.
+/// Interface for comparing two TupleRows based on a set of exprs.
 class TupleRowComparator {
  public:
   /// 'ordering_exprs': the ordering expressions for tuple comparison.
-  /// 'is_asc' determines, for each expr, if it should be ascending or descending sort
-  /// order.
-  /// 'nulls_first' determines, for each expr, if nulls should come before or after all
-  /// other values.
-  TupleRowComparator(const std::vector<ScalarExpr*>& ordering_exprs,
-      const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first)
+  TupleRowComparator(const std::vector<ScalarExpr*>& ordering_exprs)
     : ordering_exprs_(ordering_exprs),
-      is_asc_(is_asc),
-      codegend_compare_fn_(nullptr) {
-    DCHECK_EQ(is_asc_.size(), ordering_exprs.size());
-    for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1);
-  }
+      codegend_compare_fn_(nullptr) { }
+  virtual ~TupleRowComparator() {}
 
   /// Create the evaluators for the ordering expressions and store them in 'pool'. The
   /// evaluators use 'expr_perm_pool' and 'expr_results_pool' for permanent and result
@@ -113,15 +105,7 @@ class TupleRowComparator {
     return Less(lhs_row, rhs_row);
   }
 
- private:
-  /// Interpreted implementation of Compare().
-  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const;
-
-  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
-  /// TODO: inline this at codegen'd callsites instead of indirectly calling via function
-  /// pointer.
-  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn);
-
+ protected:
   /// References to ordering expressions owned by the Exec node which owns this
   /// TupleRowComparator.
   const std::vector<ScalarExpr*>& ordering_exprs_;
@@ -131,9 +115,6 @@ class TupleRowComparator {
   std::vector<ScalarExprEvaluator*> ordering_expr_evals_lhs_;
   std::vector<ScalarExprEvaluator*> ordering_expr_evals_rhs_;
 
-  const std::vector<bool>& is_asc_;
-  std::vector<int8_t> nulls_first_;
-
   /// We store a pointer to the codegen'd function pointer (adding an extra level of
   /// indirection) so that copies of this TupleRowComparator will have the same pointer to
   /// the codegen'd function. This is necessary because the codegen'd function pointer is
@@ -144,6 +125,85 @@ class TupleRowComparator {
   typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* const*,
       const TupleRow*, const TupleRow*);
   CompareFn* codegend_compare_fn_;
+ private:
+  /// Interpreted implementation of Compare().
+  virtual int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const = 0;
+
+  /// Codegen Compare(). Returns a non-OK status if codegen is unsuccessful.
+  /// TODO: inline this at codegen'd callsites instead of indirectly calling via function
+  /// pointer.
+  virtual Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) = 0;
+};
+
+/// Compares two TupleRows based on a set of exprs, in lexicographical order.
+class TupleRowLexicalComparator : public TupleRowComparator {
+ public:
+  /// 'ordering_exprs': the ordering expressions for tuple comparison.
+  /// 'is_asc' determines, for each expr, if it should be ascending or descending sort
+  /// order.
+  /// 'nulls_first' determines, for each expr, if nulls should come before or after all
+  /// other values.
+  TupleRowLexicalComparator(const std::vector<ScalarExpr*>& ordering_exprs,
+      const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first)
+    : TupleRowComparator(ordering_exprs),
+      is_asc_(is_asc) {
+    DCHECK_EQ(is_asc_.size(), ordering_exprs.size());
+    for (bool null_first : nulls_first) nulls_first_.push_back(null_first ? -1 : 1);
+  }
+
+ private:
+  const std::vector<bool>& is_asc_;
+  std::vector<int8_t> nulls_first_;
+
+  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const override;
+
+  /// TODO: implement it.
+  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) override;
+};
+
+/// Compares two TupleRows based on a set of exprs, in Z-order.
+class TupleRowZOrderComparator : public TupleRowComparator {
+ public:
+  /// 'ordering_exprs': the ordering expressions for tuple comparison.
+  TupleRowZOrderComparator(const std::vector<ScalarExpr*>& ordering_exprs)
+    : TupleRowComparator(ordering_exprs) { }
+
+ private:
+  typedef __uint128_t uint128_t;
+
+  int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const override;
+  Status CodegenCompare(LlvmCodeGen* codegen, llvm::Function** fn) override;
+
+  /// Compares the rows using Z-ordering. The function does not calculate the actual
+  /// Z-value, only looks for the column with the most significant dimension, and compares
+  /// the values of that column. To make this possible, a unified type is necessary where
+  /// all values share the same bit-representation. The three common types which all the
+  /// others are converted to are uint32_t, uint64_t and uint128_t. Comparing smaller
+  /// types (ie. having less bits) with bigger ones would make the bigger type much more
+  /// dominant therefore the bits of these smaller types are shifted up.
+  template<typename U>
+  int CompareBasedOnSize(const TupleRow* lhs, const TupleRow* rhs) const;
+
+  /// We transform the original a and b values to their "shared representation", a' and b'
+  /// in a way that if a < b then a' is lexically less than b' regarding to their bits.
+  /// Thus, for ints INT_MIN would be 0, INT_MIN+1 would be 1, and so on, and in the end
+  /// INT_MAX would be 111..111.
+  /// The basic concept of getting the shared representation is as follows:
+  /// 1. Reinterpret void* as the actual type
+  /// 2. Convert the number to the chosen unsigned type (U)
+  /// 3. If U is bigger than the actual type, the bits of the small type are shifted up.
+  /// 4. Flip the sign bit because the value was converted to unsigned.
+  /// Note that floating points are represented differently, where for negative values all
+  /// bits have to get flipped.
+  /// Null values will be treated as the minimum value (unsigned 0).
+  template <typename U>
+  U GetSharedRepresentation(void* val, ColumnType type) const;
+  template <typename U>
+  U inline GetSharedStringRepresentation(const char* char_ptr, int length) const;
+  template <typename U, typename T>
+  U inline GetSharedIntRepresentation(const T val, U mask) const;
+  template <typename U, typename T>
+  U inline GetSharedFloatRepresentation(void* val, U mask) const;
 };
 
 /// Compares the equality of two Tuples, going slot by slot.
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 40bae14..60ef906 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -667,19 +667,6 @@ class TableDef {
             "equivalent to SORT BY. Please, use the latter, if that was your " +
             "intention."));
       }
-
-      List<? extends Type> notSupportedTypes = Arrays.asList(Type.STRING, Type.VARCHAR,
-          Type.FLOAT, Type.DOUBLE);
-      for (Integer position : colIdxs) {
-        Type colType = columnTypes.get(position);
-
-        if (notSupportedTypes.stream().anyMatch(type -> colType.matchesType(type))) {
-          throw new AnalysisException(String.format("SORT BY ZORDER does not support "
-              + "column types: %s", String.join(", ",
-                  notSupportedTypes.stream().map(type -> type.toString())
-                  .collect(Collectors.toList()))));
-        }
-      }
     }
 
     Preconditions.checkState(numColumns == colIdxs.size());
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index a0faae2..a64b98a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1298,6 +1298,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     AnalyzesOk("alter table functional.alltypes sort by zorder (int_col,id)");
     AnalyzesOk("alter table functional.alltypes sort by zorder (bool_col,int_col,id)");
+    AnalyzesOk(
+      "alter table functional.alltypes sort by zorder (timestamp_col, string_col)");
     AnalyzesOk("alter table functional.alltypes sort by zorder ()");
     AnalysisError("alter table functional.alltypes sort by zorder (id)",
         "SORT BY ZORDER with 1 column is equivalent to SORT BY. Please, use the " +
@@ -1308,9 +1310,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "not find SORT BY column 'foo' in table.");
     AnalysisError("alter table functional_hbase.alltypes sort by zorder (id, foo)",
         "ALTER TABLE SORT BY not supported on HBase tables.");
-    AnalysisError("alter table functional.alltypes sort by zorder (bool_col,string_col)",
-        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
-        "DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
@@ -1971,10 +1970,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip)");
     AnalyzesOk("create table newtbl_DNE like parquet "
         + "'/test-warehouse/schemas/decimal.parquet' sort by zorder (d32, d11)");
-    AnalysisError("create table newtbl_DNE like parquet "
-        + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by  zorder (id,zip)",
-        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
-        "DOUBLE");
     AnalyzesOk("create table if not exists functional.zipcode_incomes like parquet "
         + "'/test-warehouse/schemas/zipcode_incomes.parquet'");
     AnalyzesOk("create table if not exists newtbl_DNE like parquet "
@@ -2350,15 +2345,17 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table tbl sort by (int_col,foo) like functional.alltypes",
         "Could not find SORT BY column 'foo' in table.");
 
-    // Test zsort columns.
+    // Test sort by zorder columns.
     BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
 
     AnalyzesOk("create table tbl sort by zorder (int_col,id) like functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (float_col,id) like functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (double_col,id) like " +
+        "functional.alltypes");
+    AnalyzesOk("create table tbl sort by zorder (string_col,timestamp_col) like " +
+        "functional.alltypes");
     AnalysisError("create table tbl sort by zorder (int_col,foo) like " +
         "functional.alltypes", "Could not find SORT BY column 'foo' in table.");
-    AnalysisError("create table tbl sort by zorder (string_col,id) like " +
-        "functional.alltypes", "SORT BY ZORDER does not support column types: STRING, " +
-        "VARCHAR(*), FLOAT, DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
@@ -2694,19 +2691,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table functional.new_table (i int) PARTITIONED BY (d decimal)" +
         "sort by zorder (i, d)", "SORT BY column list must not contain partition " +
         "column: 'd'");
-    // For Z-Order, string, varchar, float and double columns are not supported.
-    AnalysisError("create table functional.new_table (i int, s string) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s varchar(32)) sort by " +
-        "zorder (i, s)", "SORT BY ZORDER does not support column types: STRING, " +
-        "VARCHAR(*), FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s double) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
-    AnalysisError("create table functional.new_table (i int, s float) sort by zorder " +
-        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
-        "FLOAT, DOUBLE");
 
     BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }


[impala] 03/03: IMPALA-9389: [DOCS] Support reading zstd text files

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 519093fbb5c499aaf588999c77d8a711e8bd2aab
Author: Kris Hahn <kh...@cloudera.com>
AuthorDate: Wed Feb 26 20:02:40 2020 -0800

    IMPALA-9389: [DOCS] Support reading zstd text files
    
    In impala_txtfile.xml:
    - corrected file extension to csv_compressed_zstd.csv.zst
    Change-Id: Ic83137bd2c3a49398fb60cf1901f8b74ed111fce
    Reviewed-on: http://gerrit.cloudera.org:8080/15304
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docs/topics/impala_file_formats.xml |  6 +--
 docs/topics/impala_txtfile.xml      | 74 +++++++++++++++++++------------------
 2 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/docs/topics/impala_file_formats.xml b/docs/topics/impala_file_formats.xml
index a87da92..d016da3 100644
--- a/docs/topics/impala_file_formats.xml
+++ b/docs/topics/impala_file_formats.xml
@@ -147,9 +147,7 @@ under the License.
             <entry>
               Unstructured
             </entry>
-            <entry rev="2.0.0">
-              LZO, gzip, bzip2, Snappy
-            </entry>
+            <entry rev="2.0.0"> LZO, gzip, bzip2, Snappy, zstd</entry>
             <entry>
               Yes. For <codeph>CREATE TABLE</codeph> with no <codeph>STORED AS</codeph> clause,
               the default file format is uncompressed text, with values separated by ASCII
@@ -314,7 +312,7 @@ under the License.
       </dlentry>
       <dlentry>
         <dt>Zstd</dt>
-        <dd>For Parquet files only.</dd>
+        <dd>For Parquet and text files only.</dd>
       </dlentry>
 
       <dlentry>
diff --git a/docs/topics/impala_txtfile.xml b/docs/topics/impala_txtfile.xml
index ecf11bb..4491aa7 100644
--- a/docs/topics/impala_txtfile.xml
+++ b/docs/topics/impala_txtfile.xml
@@ -120,14 +120,13 @@ under the License.
         details.
       </p>
 
-      <p rev="2.0.0">
-        In Impala 2.0 and later, you can also use text data compressed in the gzip, bzip2, or Snappy formats.
-        Because these compressed formats are not <q>splittable</q> in the way that LZO is, there is less
-        opportunity for Impala to parallelize queries on them. Therefore, use these types of compressed data only
-        for convenience if that is the format in which you receive the data. Prefer to use LZO compression for text
-        data if you have the choice, or convert the data to Parquet using an <codeph>INSERT ... SELECT</codeph>
-        statement to copy the original data into a Parquet table.
-      </p>
+      <p rev="2.0.0">You can also use text data compressed in the bzip2, gzip, Snappy, or zstd
+        formats. Because these compressed formats are not <q>splittable</q> in the way that LZO is,
+        there is less opportunity for Impala to parallelize queries on them. Therefore, use these
+        types of compressed data only for convenience if that is the format in which you receive the
+        data. Prefer to use LZO compression for text data if you have the choice, or convert the
+        data to Parquet using an <codeph>INSERT ... SELECT</codeph> statement to copy the original
+        data into a Parquet table. </p>
 
       <note rev="2.2.0">
         <p>
@@ -135,11 +134,14 @@ under the License.
           multiple streams created by the <codeph>pbzip2</codeph> command. Impala decodes only the data from the
           first part of such files, leading to incomplete results.
         </p>
+      </note>
 
         <p>
           The maximum size that Impala can accommodate for an individual bzip file is 1 GB (after uncompression).
         </p>
-      </note>
+        <p>
+          Impala supports zstd files created by the zstd command line tool.
+        </p>
 
       <p conref="../shared/impala_common.xml#common/s3_block_splitting"/>
 
@@ -630,39 +632,37 @@ hive&gt; INSERT INTO TABLE lzo_t SELECT col1, col2 FROM uncompressed_text_table;
 
   <concept rev="2.0.0" id="gzip">
 
-    <title>Using gzip, bzip2, or Snappy-Compressed Text Files</title>
+    <title>Using bzip2, gzip, Snappy-Compressed, or zstd Text Files</title>
   <prolog>
     <metadata>
       <data name="Category" value="Snappy"/>
       <data name="Category" value="Gzip"/>
+      <data name="Category" value="Zstd"/>
       <data name="Category" value="Compression"/>
     </metadata>
   </prolog>
 
     <conbody>
 
-      <p> In Impala 2.0 and later, Impala supports using text data files that
-        employ gzip, bzip2, or Snappy compression. These compression types are
-        primarily for convenience within an existing ETL pipeline rather than
-        maximum performance. Although it requires less I/O to read compressed
-        text than the equivalent uncompressed text, files compressed by these
-        codecs are not <q>splittable</q> and therefore cannot take full
-        advantage of the Impala parallel query capability. </p>
-
-      <p>
-        As each bzip2- or Snappy-compressed text file is processed, the node doing the work reads the entire file
-        into memory and then decompresses it. Therefore, the node must have enough memory to hold both the
-        compressed and uncompressed data from the text file. The memory required to hold the uncompressed data is
-        difficult to estimate in advance, potentially causing problems on systems with low memory limits or with
-        resource management enabled. <ph rev="2.1.0">In Impala 2.1 and higher, this memory overhead is reduced for
-        gzip-compressed text files. The gzipped data is decompressed as it is read, rather than all at once.</ph>
+      <p> Impala supports using text data files that employ bzip2, gzip, Snappy, or zstd
+        compression. These compression types are primarily for convenience within an existing ETL
+        pipeline rather than maximum performance. Although it requires less I/O to read compressed
+        text than the equivalent uncompressed text, files compressed by these codecs are not
+          <q>splittable</q> and therefore cannot take full advantage of the Impala parallel query
+        capability. Impala can read compressed text files written by Hive.</p>
+
+      <p> As each Snappy-compressed file is processed, the node doing the work reads the entire file
+        into memory and then decompresses it. Therefore, the node must have enough memory to hold
+        both the compressed and uncompressed data from the text file. The memory required to hold
+        the uncompressed data is difficult to estimate in advance, potentially causing problems on
+        systems with low memory limits or with resource management enabled. <ph rev="2.1.0">This
+          memory overhead is reduced for bzip-, gzip-, and zstd-compressed text files. The
+          compressed data is decompressed as it is read, rather than all at once.</ph>
       </p>
 
-      <p>
-        To create a table to hold gzip, bzip2, or Snappy-compressed text, create a text table with no special
-        compression options. Specify the delimiter and escape character if required, using the <codeph>ROW
-        FORMAT</codeph> clause.
-      </p>
+      <p> To create a table to hold compressed text, create a text table with no special compression
+        options. Specify the delimiter and escape character if required, using the <codeph>ROW
+          FORMAT</codeph> clause. </p>
 
       <p>
         Because Impala can query compressed text files but currently cannot write them, produce the compressed text
@@ -671,11 +671,10 @@ hive&gt; INSERT INTO TABLE lzo_t SELECT col1, col2 FROM uncompressed_text_table;
         the <codeph>LOCATION</codeph> attribute at a directory containing existing compressed text files.)
       </p>
 
-      <p>
-        For Impala to recognize the compressed text files, they must have the appropriate file extension
-        corresponding to the compression codec, either <codeph>.gz</codeph>, <codeph>.bz2</codeph>, or
-        <codeph>.snappy</codeph>. The extensions can be in uppercase or lowercase.
-      </p>
+      <p> For Impala to recognize the compressed text files, they must have the appropriate file
+        extension corresponding to the compression codec, either <codeph>.bz2</codeph>,
+          <codeph>.gz</codeph>, <codeph>.snappy</codeph>, or <codeph>.zst</codeph>. The extensions
+        can be in uppercase or lowercase. </p>
 
       <p>
         The following example shows how you can create a regular text table, put different kinds of compressed and
@@ -689,7 +688,7 @@ hive&gt; INSERT INTO TABLE lzo_t SELECT col1, col2 FROM uncompressed_text_table;
 insert into csv_compressed values
   ('one - uncompressed', 'two - uncompressed', 'three - uncompressed'),
   ('abc - uncompressed', 'xyz - uncompressed', '123 - uncompressed');
-...make equivalent .gz, .bz2, and .snappy files and load them into same table directory...
+...make equivalent .bz2, .gz, .snappy, and .zst files and load them into same table directory...
 
 select * from csv_compressed;
 +--------------------+--------------------+----------------------+
@@ -702,6 +701,8 @@ select * from csv_compressed;
 | abc - bz2          | xyz - bz2          | 123 - bz2            |
 | one - gzip         | two - gzip         | three - gzip         |
 | abc - gzip         | xyz - gzip         | 123 - gzip           |
+| one - zstd         | two - zstd         | three - zstd         |
+| abc - zstd         | xyz - zstd         | 123 - zstd           |
 +--------------------+--------------------+----------------------+
 
 $ hdfs dfs -ls 'hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/';
@@ -709,6 +710,7 @@ $ hdfs dfs -ls 'hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_co
 75 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed.snappy
 79 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_bz2.csv.bz2
 80 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_gzip.csv.gz
+85 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_zstd.csv.zst
 116 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/dd414df64d67d49b_data.0.
 </codeblock>
 


[impala] 01/03: IMPALA-9226: Improve string allocations of the ORC scanner

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c48efd407e7b857c6df0d167aafe02f93c81e2fb
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Mon Jan 13 16:19:41 2020 +0100

    IMPALA-9226: Improve string allocations of the ORC scanner
    
    Currently the OrcColumnReader copies values from the
    orc::StringVectorBatch one-by-one. Since ORC 1.6, the blob which
    contains the pointed values is moved to the StringVectorBatch,
    so we can copy it.
    
    This commit beside the above improvement also enables the
    LazyEncoding option for the ORC reader. This way, for stripes
    with DICTIONARY_ENCODING[_V2], EncodedStringVectorBatch contains
    the data in a dictionaryBlob from which the data can be acquired
    with the given indices and lengths.
    
    Tests:
     * Run ORC scanner tests (query_tests/test_scanners.py::TestOrc)
       and tpch query tests.
     * Tested performance on tpch.lineitem table with scale=25,
       running queries that selects min of string columns.
       Some results:
       col_name     | encoding | before | after | speedup
       =============================================================
       l_comment      DIRECT     16.42s   14.38s  14%
       l_shipinstruct DICTIONARY 5.26s    3.80s   32%
       l_commitdate   DICTIONARY 5.46s    5.19s   5%
       all string col BOTH       39.06s   32.18s  21%
    
       The queries were run on a desktop PC with MT_DOP and NUM_NODES
       set to 1.
     * Also run TPC-H queries on the TPC-H benchmark where some
       queries' runtime improved by around 10-15%, while there were
       no regression for the others.
    
    Change-Id: If2d975946fb6f4104d8dc98895285b3a0c6bef7f
    Reviewed-on: http://gerrit.cloudera.org:8080/15051
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-orc-scanner.cc   | 21 ++++++++--
 be/src/exec/hdfs-orc-scanner.h    |  7 ++++
 be/src/exec/orc-column-readers.cc | 83 +++++++++++++++++++++++++++------------
 be/src/exec/orc-column-readers.h  | 66 ++++++++++++++++++++++++-------
 4 files changed, 135 insertions(+), 42 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index a38b345..1a607f3 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -134,6 +134,8 @@ void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
 
 HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
   : HdfsScanner(scan_node, state),
+    dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+    data_batch_pool_(new MemPool(scan_node->mem_tracker())),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
   assemble_rows_timer_.Stop();
 }
@@ -184,6 +186,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
   // columns we don't need.
   RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc()));
+  // By enabling lazy decoding, String stripes with DICTIONARY_ENCODING[_V2] can be
+  // stored in an EncodedStringVectorBatch, where the data is stored in a dictionary
+  // blob more efficiently.
+  row_reader_options_.setEnableLazyDecoding(true);
 
   // Build 'col_id_path_map_' that maps from ORC column ids to their corresponding
   // SchemaPath in the table. The map is used in the constructors of OrcColumnReaders
@@ -226,18 +232,24 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
   if (row_batch != nullptr) {
     context_->ReleaseCompletedResources(true);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
+    row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
     if (scan_node_->HasRowBatchQueue()) {
       static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
           unique_ptr<RowBatch>(row_batch));
     }
   } else {
     template_tuple_pool_->FreeAll();
+    dictionary_pool_->FreeAll();
+    data_batch_pool_->FreeAll();
     context_->ReleaseCompletedResources(true);
   }
   orc_root_batch_.reset(nullptr);
 
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
+  DCHECK_EQ(data_batch_pool_->total_allocated_bytes(), 0);
 
   assemble_rows_timer_.Stop();
   assemble_rows_timer_.ReleaseCounter();
@@ -501,8 +513,10 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
   // to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
   // set to true in 'AssembleRows'.
   while (advance_stripe_ || end_of_stripe_) {
+    // The next stripe will use a new dictionary blob so transfer the memory to row_batch.
+    row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
     context_->ReleaseCompletedResources(/* done */ true);
-    // Commit the rows to flush the row batch from the previous stripe
+    // Commit the rows to flush the row batch from the previous stripe.
     RETURN_IF_ERROR(CommitRows(0, row_batch));
 
     RETURN_IF_ERROR(NextStripe());
@@ -617,7 +631,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   if (!continue_execution) return Status::CancelledInternal("ORC scanner");
 
   // We're going to free the previous batch. Clear the reference first.
-  orc_root_reader_->UpdateInputBatch(nullptr);
+  RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(nullptr));
 
   orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
   DCHECK_EQ(orc_root_batch_->numElements, 0);
@@ -625,9 +639,10 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
   int64_t num_rows_read = 0;
   while (continue_execution) {  // one ORC batch (ColumnVectorBatch) in a round
     if (orc_root_reader_->EndOfBatch()) {
+      row_batch->tuple_data_pool()->AcquireData(data_batch_pool_.get(), false);
       try {
         end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
-        orc_root_reader_->UpdateInputBatch(orc_root_batch_.get());
+        RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get()));
         if (end_of_stripe_) break; // no more data to process
       } catch (ResourceError& e) {
         parse_status_ = e.GetStatus();
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index d26ca88..a6ba523 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -170,6 +170,13 @@ class HdfsOrcScanner : public HdfsScanner {
   /// Mem pool used in orc readers.
   boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
 
+  /// Pool to copy dictionary buffer into.
+  /// This pool is shared across all the batches in a stripe.
+  boost::scoped_ptr<MemPool> dictionary_pool_;
+  /// Pool to copy non-dictionary buffer into. This pool is responsible for handling
+  /// vector batches that do not necessarily fit into one row batch.
+  boost::scoped_ptr<MemPool> data_batch_pool_;
+
   std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
 
   /// orc::Reader's responsibility is to read the footer and metadata from an ORC file.
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 6216d10..99608af 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -149,13 +149,45 @@ Status OrcBoolColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
   return Status::OK();
 }
 
+Status OrcStringColumnReader::InitBlob(orc::DataBuffer<char>* blob, MemPool* pool) {
+  // TODO: IMPALA-9310: Possible improvement is moving the buffer out from orc::DataBuffer
+  // instead of copying and let Impala free the memory later.
+  blob_ = reinterpret_cast<char*>(pool->TryAllocateUnaligned(blob->size()));
+  if (UNLIKELY(blob_ == nullptr)) {
+    string details = Substitute("Could not allocate string buffer of $0 bytes "
+        "for ORC file '$1'.", blob->size(), scanner_->filename());
+    return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
+        scanner_->state_, details, blob->size());
+  }
+  memcpy(blob_, blob->data(), blob->size());
+  return Status::OK();
+}
+
 Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
   if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
     SetNullSlot(tuple);
     return Status::OK();
   }
-  const char* src_ptr = batch_->data.data()[row_idx];
-  int64_t src_len = batch_->length.data()[row_idx];
+  char* src_ptr;
+  int src_len;
+
+  if (batch_->isEncoded) {
+    orc::EncodedStringVectorBatch* currentBatch =
+        static_cast<orc::EncodedStringVectorBatch*>(batch_);
+
+    orc::DataBuffer<int64_t>& offsets = currentBatch->dictionary->dictionaryOffset;
+    int64_t index = currentBatch->index[row_idx];
+    if (UNLIKELY(index < 0  || static_cast<uint64_t>(index) + 1 >= offsets.size())) {
+      return Status(Substitute("Corrupt ORC file: $0. Index ($1) out of range [0, $2) in "
+          "StringDictionary.", scanner_->filename(), index, offsets.size()));;
+    }
+    src_ptr = blob_ + offsets[index];
+    src_len = offsets[index + 1] - offsets[index];
+  } else {
+    // The pointed data is now in blob_, a buffer handled by Impala.
+    src_ptr = blob_ + (batch_->data[row_idx] - batch_->blob.data());
+    src_len = batch_->length[row_idx];
+  }
   int dst_len = slot_desc_->type().len;
   if (slot_desc_->type().type == TYPE_CHAR) {
     int unpadded_len = min(dst_len, static_cast<int>(src_len));
@@ -170,17 +202,7 @@ Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool
   } else {
     dst->len = src_len;
   }
-  // Space in the StringVectorBatch is allocated by scanner_->reader_mem_pool_. It will
-  // be reused at next batch, so we allocate a new space for this string.
-  uint8_t* buffer = pool->TryAllocateUnaligned(dst->len);
-  if (buffer == nullptr) {
-    string details = Substitute("Could not allocate string buffer of $0 bytes "
-        "for ORC file '$1'.", dst->len, scanner_->filename());
-    return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
-        scanner_->state_, details, dst->len);
-  }
-  dst->ptr = reinterpret_cast<char*>(buffer);
-  memcpy(dst->ptr, src_ptr, dst->len);
+  dst->ptr = src_ptr;
   return Status::OK();
 }
 
@@ -348,21 +370,24 @@ Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
   return Status::OK();
 }
 
-void OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::StructVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::StructVectorBatch*>(orc_batch));
   if (batch_ == nullptr || batch_->numElements == 0) {
     row_idx_ = 0;
-    for (OrcColumnReader* child : children_) child->UpdateInputBatch(nullptr);
-    return;
+    for (OrcColumnReader* child : children_) {
+      RETURN_IF_ERROR(child->UpdateInputBatch(nullptr));
+    }
+    return Status::OK();
   }
   row_idx_ = 0;
   int size = children_.size();
   for (int c = 0; c < size; ++c) {
-    children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]);
+    RETURN_IF_ERROR(children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]));
   }
+  return Status::OK();
 }
 
 Status OrcStructReader::TransferTuple(Tuple* tuple, MemPool* pool) {
@@ -447,17 +472,20 @@ OrcListReader::OrcListReader(const orc::Type* node, const SlotDescriptor* slot_d
       << (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
 }
 
-void OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::ListVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::ListVectorBatch*>(orc_batch));
   orc::ColumnVectorBatch* item_batch = batch_ ? batch_->elements.get() : nullptr;
-  for (OrcColumnReader* child : children_) child->UpdateInputBatch(item_batch);
+  for (OrcColumnReader* child : children_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(item_batch));
+  }
   if (batch_) {
     row_idx_ = -1;
     NextRow();
   }
+  return Status::OK();
 }
 
 int OrcListReader::GetNumTuples(int row_idx) const {
@@ -581,19 +609,24 @@ OrcMapReader::OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_des
   }
 }
 
-void OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
-  OrcComplexColumnReader::UpdateInputBatch(orc_batch);
+Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
+  RETURN_IF_ERROR(OrcComplexColumnReader::UpdateInputBatch(orc_batch));
   batch_ = static_cast<orc::MapVectorBatch*>(orc_batch);
   // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
   DCHECK(batch_ == dynamic_cast<orc::MapVectorBatch*>(orc_batch));
   orc::ColumnVectorBatch* key_batch = batch_ ? batch_->keys.get() : nullptr;
   orc::ColumnVectorBatch* value_batch = batch_ ? batch_->elements.get() : nullptr;
-  for (OrcColumnReader* child : key_readers_) child->UpdateInputBatch(key_batch);
-  for (OrcColumnReader* child : value_readers_) child->UpdateInputBatch(value_batch);
+  for (OrcColumnReader* child : key_readers_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(key_batch));
+  }
+  for (OrcColumnReader* child : value_readers_) {
+    RETURN_IF_ERROR(child->UpdateInputBatch(value_batch));
+  }
   if (batch_) {
     row_idx_ = -1;
     NextRow();
   }
+  return Status::OK();
 }
 
 void OrcMapReader::NextRow() {
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 222b82d..d88c805 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -85,7 +85,8 @@ class OrcColumnReader {
   virtual bool IsCollectionReader() const { return false; }
 
   /// Update the orc batch we tracked. We'll read values from it.
-  virtual void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) = 0;
+  virtual Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch)
+      WARN_UNUSED_RESULT = 0;
 
   /// Read value at 'row_idx' of the ColumnVectorBatch into a slot of the given 'tuple'.
   /// Use 'pool' to allocate memory in need. Depends on the UpdateInputBatch being called
@@ -125,10 +126,11 @@ class OrcBoolColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     // In debug mode, we use dynamic_cast<> to double-check the downcast is legal
     DCHECK(batch_ == dynamic_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -143,9 +145,10 @@ class OrcIntColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     DCHECK(batch_ == static_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -169,9 +172,10 @@ class OrcDoubleColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::DoubleVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::DoubleVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -194,14 +198,43 @@ class OrcStringColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::StringVectorBatch*>(orc_batch);
-    DCHECK(batch_ == dynamic_cast<orc::StringVectorBatch*>(orc_batch));
+    if (orc_batch == nullptr) return Status::OK();
+    // We update the blob of a non-encoded batch every time, but since the dictionary blob
+    // is the same for the stripe, we only reset it for every new stripe.
+    // Note that this is possible since the encoding should be the same for every batch
+    // through the whole stripe.
+    if(!orc_batch->isEncoded) {
+      DCHECK(batch_ == dynamic_cast<orc::StringVectorBatch*>(orc_batch));
+      return InitBlob(&batch_->blob, scanner_->data_batch_pool_.get());
+    }
+    DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) ==
+        dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch));
+    if (last_stripe_idx_ != scanner_->stripe_idx_) {
+      last_stripe_idx_ = scanner_->stripe_idx_;
+      auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_);
+      return InitBlob(&current_batch->dictionary->dictionaryBlob,
+          scanner_->dictionary_pool_.get());
+    }
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
  private:
   orc::StringVectorBatch* batch_ = nullptr;
+  // We copy the blob from the batch, so the memory will be handled by Impala, and not
+  // by the ORC lib.
+  char* blob_ = nullptr;
+
+  // We cache the last stripe so we know when we have to update the blob (in case of
+  // dictionary encoding).
+  int last_stripe_idx_ = -1;
+
+  /// Initializes the blob if it has not been already in the current batch.
+  /// Unfortunately, this cannot be done in UpdateInputBatch, since we do not have
+  /// access to the pool there.
+  Status InitBlob(orc::DataBuffer<char>* blob, MemPool* pool);
 };
 
 class OrcTimestampReader : public OrcColumnReader {
@@ -210,9 +243,10 @@ class OrcTimestampReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::TimestampVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::TimestampVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -226,9 +260,10 @@ class OrcDateColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::LongVectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -243,10 +278,11 @@ class OrcDecimalColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     // Reminder: even decimal(1,1) is stored in int64 batch
     batch_ = static_cast<orc::Decimal64VectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::Decimal64VectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
@@ -269,9 +305,10 @@ class OrcDecimal16ColumnReader : public OrcColumnReader {
       HdfsOrcScanner* scanner)
       : OrcColumnReader(node, slot_desc, scanner) { }
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     batch_ = static_cast<orc::Decimal128VectorBatch*>(orc_batch);
     DCHECK(batch_ == dynamic_cast<orc::Decimal128VectorBatch*>(orc_batch));
+    return Status::OK();
   }
 
   Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
@@ -344,8 +381,9 @@ class OrcComplexColumnReader : public OrcColumnReader {
   /// Whether we've finished reading the current orc batch.
   bool EndOfBatch();
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT {
     vbatch_ = orc_batch;
+    return Status::OK();
   }
 
   /// Assemble current collection value (tracked by 'row_idx_') into a top level 'tuple'.
@@ -385,7 +423,7 @@ class OrcStructReader : public OrcComplexColumnReader {
   OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
 
@@ -442,7 +480,7 @@ class OrcListReader : public OrcCollectionReader {
   OrcListReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
 
@@ -474,7 +512,7 @@ class OrcMapReader : public OrcCollectionReader {
   OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_desc,
       HdfsOrcScanner* scanner);
 
-  void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
+  Status UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override WARN_UNUSED_RESULT;
 
   Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;