You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/09/26 20:16:10 UTC

[impala] branch master updated: IMPALA-8755: Frontend support for Z-ordering

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 288c8c4  IMPALA-8755: Frontend support for Z-ordering
288c8c4 is described below

commit 288c8c41b530e2a54850257c3c70d16328dbcf0b
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Mon Aug 12 12:36:53 2019 +0200

    IMPALA-8755: Frontend support for Z-ordering
    
    Extended the SQL grammar with an optional and a default flag for
    SORT BY, namely ZORDER and LEXICAL. If set, the new 'sort.algorithm'
    table property will be set to ZORDER and the information will sink
    down to the backend. The default order is indicated by LEXICAL
    and can be omitted. Examples are:
    
    CREATE TABLE t (a INT, b INT) PARTITIONED BY (c INT)
      SORT BY ZORDER (a, b);
    CREATE TABLE t SORT BY ZORDER (int_col,id) LIKE u;
    CREATE TABLE t LIKE PARQUET '/foo' SORT BY ZORDER (id,zip);
    
    ALTER TABLE t SORT BY ZORDER (int_col,id);
    
    The following two are the same statements:
    CREATE TABLE t (a INT, b INT) SORT BY (a, b);
    CREATE TABLE t (a INT, b INT) SORT BY LEXICAL (a, b);
    
    For strings, varchars, floats and doubles Z-ordering is currently
    not supported. It's not suitable for strings and varchars, but
    support can be added for floats and doubles later. The supported
    types are: boolean, int types, decimals, date, timestamp, and char.
    
    Currently ZORDER has the same functionality as a simple SORT BY clause,
    therefore hidden behind a feature flag: unlock_zorder. The custom
    sorting with Z-ordering will be in a different commit later.
    
    Testing:
     * Added tests for the ZORDER option for every SORT BY test.
     * Modified some tests by adding the LEXICAL option.
     * The .test workloads are temporarily put in separate test files
       in order to set up the feature flag. These tests are run from
       tests/custom_cluster/test_zorder.py which is a duplication of
       the relevant tests, but with CustomClusterTestSuite decorator.
    
    Change-Id: Ie122002ca8f52ca2c1e1ec8ff1d476ae1f4f875d
    Reviewed-on: http://gerrit.cloudera.org:8080/13955
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   3 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/JniCatalog.thrift                    |   8 +-
 common/thrift/PlanNodes.thrift                     |   2 +
 common/thrift/Types.thrift                         |   6 +
 fe/src/main/cup/sql-parser.cup                     |  77 +++--
 .../analysis/AlterTableSetTblProperties.java       |  42 ++-
 .../impala/analysis/AlterTableSortByStmt.java      |  41 +--
 .../impala/analysis/CreateTableLikeFileStmt.java   |   9 +-
 .../impala/analysis/CreateTableLikeStmt.java       |  23 +-
 .../apache/impala/analysis/CreateTableStmt.java    |   3 +
 .../org/apache/impala/analysis/DeleteStmt.java     |   4 +-
 .../org/apache/impala/analysis/InsertStmt.java     |  17 +-
 .../java/org/apache/impala/analysis/SortInfo.java  |  10 +
 .../java/org/apache/impala/analysis/TableDef.java  |  96 ++++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  56 ++--
 .../org/apache/impala/analysis/UpdateStmt.java     |   3 +-
 .../org/apache/impala/planner/AnalyticPlanner.java |  23 +-
 .../org/apache/impala/planner/ExchangeNode.java    |  16 +-
 .../org/apache/impala/planner/HdfsTableSink.java   |  16 +-
 .../java/org/apache/impala/planner/PlanNode.java   |  46 +++
 .../java/org/apache/impala/planner/Planner.java    |   3 +-
 .../java/org/apache/impala/planner/SortNode.java   |  16 +-
 .../java/org/apache/impala/planner/TableSink.java  |  25 +-
 .../org/apache/impala/service/BackendConfig.java   |   8 +
 .../apache/impala/service/CatalogOpExecutor.java   |   9 +
 fe/src/main/jflex/sql-scanner.flex                 |   2 +
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  96 +++++-
 .../apache/impala/analysis/AnalyzeKuduDDLTest.java |  25 +-
 .../org/apache/impala/analysis/ParserTest.java     |  58 ++++
 .../java/org/apache/impala/analysis/ToSqlTest.java |  79 ++++-
 .../org/apache/impala/analysis/ToSqlUtilsTest.java |  22 ++
 .../authorization/AuthorizationStmtTest.java       |   5 +
 .../org/apache/impala/planner/PlannerTest.java     |  18 ++
 .../queries/PlannerTest/insert-sort-by-zorder.test | 331 +++++++++++++++++++++
 .../queries/QueryTest/alter-table-zorder.test      | 244 +++++++++++++++
 .../QueryTest/create-table-as-select-zorder.test   |  12 +
 .../QueryTest/create-table-like-file-zorder.test   |  13 +
 .../QueryTest/create-table-like-table-zorder.test  |  31 ++
 .../queries/QueryTest/create-table-zorder.test     |  12 +
 .../QueryTest/show-create-table-zorder.test        |  25 ++
 .../queries/QueryTest/show-create-table.test       |   4 +-
 tests/custom_cluster/test_zorder.py                | 302 +++++++++++++++++++
 45 files changed, 1678 insertions(+), 171 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 1d8d225..49eb97a 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -272,6 +272,9 @@ DEFINE_bool_hidden(unlock_mt_dop, false,
 DEFINE_bool_hidden(recursively_list_partitions, true,
     "If true, recursively list the content of partition directories.");
 
+DEFINE_bool(unlock_zorder_sort, false,
+    "(Experimental) If true, enables using ZORDER option for SORT BY.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index ab77cd0..5e7019f 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -78,6 +78,7 @@ DECLARE_int32(query_event_hook_nthreads);
 DECLARE_bool(is_executor);
 DECLARE_bool(use_dedicated_coordinator_estimates);
 DECLARE_string(blacklisted_dbs);
+DECLARE_bool(unlock_zorder_sort);
 DECLARE_string(blacklisted_tables);
 
 namespace impala {
@@ -160,6 +161,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_use_dedicated_coordinator_estimates(
       FLAGS_use_dedicated_coordinator_estimates);
   cfg.__set_blacklisted_dbs(FLAGS_blacklisted_dbs);
+  cfg.__set_unlock_zorder_sort(FLAGS_unlock_zorder_sort);
   cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 08c072c..0038559 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -139,4 +139,6 @@ struct TBackendGflags {
   57: required string blacklisted_dbs
 
   58: required string blacklisted_tables
+
+  59: required bool unlock_zorder_sort
 }
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index ddb15d5..19dd86b 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -81,6 +81,10 @@ struct THdfsTableSink {
 
   // Stores the allocated ACID write id if the target table is transactional.
   6: optional i64 write_id
+
+  // Sorting order. If not lexical, the backend should not populate the
+  // RowGroup::sorting_columns list in parquet files.
+  7: required Types.TSortingOrder sorting_order
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 40b40a1..94641d3 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -471,6 +471,9 @@ struct TCreateTableLikeParams {
   // The server name for security privileges when authorization is enabled.
   // TODO: Need to cleanup:IMPALA-7553
   10: optional string server_name
+
+  // The sorting order used in SORT BY clauses.
+  11: required Types.TSortingOrder sorting_order
 }
 
 // Parameters of CREATE TABLE commands
@@ -529,6 +532,9 @@ struct TCreateTableParams {
   // The server name for security privileges when authorization is enabled.
   // TODO: Need to cleanup:IMPALA-7553
   17: optional string server_name
+
+  // The sorting order used in SORT BY clauses.
+  18: required Types.TSortingOrder sorting_order
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
@@ -804,4 +810,4 @@ struct TEventProcessorMetricsSummaryResponse {
   // summary view of the events processor which can include status,
   // metrics and other details
   1: required string summary
-}
\ No newline at end of file
+}
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index b308a72..b8b76d1 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -416,6 +416,8 @@ struct TSortInfo {
   // Expressions evaluated over the input row that materialize the tuple to be sorted.
   // Contains one expr per slot in the materialized tuple.
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
+  // The sorting order used in SORT BY clauses.
+  5: required Types.TSortingOrder sorting_order
 }
 
 enum TSortType {
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 1bc3233..0c7ff71 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -264,3 +264,9 @@ struct TFunction {
   // NOTE: when adding fields to this struct, do not renumber the field IDs or
   // add new required fields. This struct is serialized into user metastores.
 }
+
+// The sorting order used in SORT BY queries.
+enum TSortingOrder {
+  LEXICAL = 0
+  ZORDER = 1
+}
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index e428b2d..159e815 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -54,6 +54,9 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.thrift.TPrincipalType;
+import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.common.NotImplementedException;
 
 parser code {:
   private Symbol errorToken_;
@@ -285,20 +288,20 @@ terminal
   KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT,
   KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL,
   KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL,
-  KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN, KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE,
-  KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT,
-  KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC, KW_ORDER, KW_OUTER, KW_OVER,
-  KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED, KW_PARTITIONS,
-  KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, KW_PURGE, KW_RANGE, KW_RCFILE,
-  KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION,
-  KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW,
-  KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES,
-  KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN,
-  KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
-  KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS,
-  KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE,
-  KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN,
-  KW_WHERE, KW_WITH;
+  KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN, KW_KUDU, KW_LAST, KW_LEFT,
+  KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN,
+  KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC, KW_ORDER,
+  KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
+  KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
+  KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME,
+  KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT,
+  KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI,
+  KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT,
+  KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT, KW_SYMBOL, KW_TABLE,
+  KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES, KW_TERMINATED, KW_TEXTFILE, KW_THEN,
+  KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS, KW_TO, KW_TRUE, KW_UNBOUNDED,
+  KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING,
+  KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
 
 terminal UNUSED_RESERVED_WORD;
 
@@ -368,7 +371,8 @@ nonterminal Expr expr, non_pred_expr, arithmetic_expr, timestamp_arithmetic_expr
 nonterminal List<Expr> expr_list;
 nonterminal String alias_clause;
 nonterminal List<String> ident_list, primary_keys;
-nonterminal List<String> opt_ident_list, opt_sort_cols;
+nonterminal List<String> opt_ident_list;
+nonterminal Pair<List<String>, TSortingOrder> opt_sort_cols;
 nonterminal TableName table_name;
 nonterminal ColumnName column_name;
 nonterminal FunctionName function_name;
@@ -1199,7 +1203,18 @@ alter_tbl_stmt ::=
   {: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :}
   | KW_ALTER KW_TABLE table_name:table KW_SORT KW_BY LPAREN opt_ident_list:col_names
     RPAREN
-  {: RESULT = new AlterTableSortByStmt(table, col_names); :}
+  {: RESULT = new AlterTableSortByStmt(table, col_names, TSortingOrder.LEXICAL); :}
+  | KW_ALTER KW_TABLE table_name:table KW_SORT KW_BY KW_LEXICAL LPAREN opt_ident_list:col_names
+    RPAREN
+  {: RESULT = new AlterTableSortByStmt(table, col_names, TSortingOrder.LEXICAL); :}
+  | KW_ALTER KW_TABLE table_name:table KW_SORT KW_BY KW_ZORDER LPAREN opt_ident_list:col_names
+    RPAREN
+  {:
+    if (!BackendConfig.INSTANCE.isZOrderSortUnlocked()) {
+      throw new NotImplementedException("Z-ordering is not yet implemented");
+    }
+    RESULT = new AlterTableSortByStmt(table, col_names, TSortingOrder.ZORDER);
+  :}
   | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET
     KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN
   {:
@@ -1382,8 +1397,9 @@ create_tbl_like_stmt ::=
   opt_comment_val:comment
   file_format_create_table_val:file_format location_val:location
   {:
-    RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), null, other_table,
-        tbl_def.isExternal(), comment, file_format, location, tbl_def.getIfNotExists());
+    RESULT = new CreateTableLikeStmt(tbl_def.getTblName(),
+        new Pair<>(null, TSortingOrder.LEXICAL), other_table, tbl_def.isExternal(),
+        comment, file_format, location, tbl_def.getIfNotExists());
   :}
   // This extra production is necessary since without it the parser will not be able to
   // parse "CREATE TABLE A LIKE B".
@@ -1442,9 +1458,26 @@ tbl_options ::=
 
 opt_sort_cols ::=
   KW_SORT KW_BY LPAREN opt_ident_list:col_names RPAREN
-  {: RESULT = col_names; :}
+  {:
+    RESULT = new Pair<List<String>, TSortingOrder>(
+      col_names, TSortingOrder.LEXICAL);
+  :}
+  | KW_SORT KW_BY KW_LEXICAL LPAREN opt_ident_list:col_names RPAREN
+  {:
+    RESULT = new Pair<List<String>, TSortingOrder>(
+      col_names, TSortingOrder.LEXICAL);
+  :}
+  | KW_SORT KW_BY KW_ZORDER LPAREN opt_ident_list:col_names RPAREN
+  {:
+    if (!BackendConfig.INSTANCE.isZOrderSortUnlocked()) {
+      throw new NotImplementedException("Z-ordering is not yet implemented");
+    }
+    RESULT = new Pair<List<String>, TSortingOrder>(
+      col_names, TSortingOrder.ZORDER);
+  :}
   | /* empty */
-  {: RESULT = null; :}
+  {: RESULT = new Pair<List<String>, TSortingOrder>(
+      null, TSortingOrder.LEXICAL); :}
   ;
 
 opt_tbl_data_layout ::=
@@ -3680,6 +3713,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_LEFT:r
   {: RESULT = r.toString(); :}
+  | KW_LEXICAL:r
+  {: RESULT = r.toString(); :}
   | KW_LIKE:r
   {: RESULT = r.toString(); :}
   | KW_LIMIT:r
@@ -3862,6 +3897,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_WITH:r
   {: RESULT = r.toString(); :}
+  | KW_ZORDER:r
+  {: RESULT = r.toString(); :}
   | UNUSED_RESERVED_WORD:r
   {: RESULT = r.toString(); :}
   ;
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 92ea8b5..7bf50e1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -34,9 +34,11 @@ import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
@@ -201,30 +203,42 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
    * 'table'. The property must store a list of column names separated by commas, and each
    * column in the property must occur in 'table' as a non-partitioning column. If there
    * are errors during the analysis, this function will throw an AnalysisException.
-   * Returns a list of positions of the sort columns within the table's list of
-   * columns.
+   * Returns a pair of list of positions of the sort columns within the table's list of
+   * columns and the corresponding sorting order.
    */
-  public static List<Integer> analyzeSortColumns(FeTable table,
+  public static Pair<List<Integer>, TSortingOrder> analyzeSortColumns(FeTable table,
       Map<String, String> tblProperties) throws AnalysisException {
-    if (!tblProperties.containsKey(
-        AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)) {
-      return new ArrayList<>();
+
+    boolean containsOrderingProperties =
+        tblProperties.containsKey(AlterTableSortByStmt.TBL_PROP_SORT_ORDER);
+    boolean containsSortingColumnProperties = tblProperties
+        .containsKey(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS);
+
+    if ((containsOrderingProperties || containsSortingColumnProperties) &&
+        table instanceof FeKuduTable) {
+      throw new AnalysisException("'sort.*' table properties are not "
+          + "supported for Kudu tables.");
+    }
+
+    TSortingOrder sortingOrder = TSortingOrder.LEXICAL;
+    if (containsOrderingProperties) {
+      sortingOrder = TSortingOrder.valueOf(tblProperties.get(
+          AlterTableSortByStmt.TBL_PROP_SORT_ORDER));
+    }
+    if (!containsSortingColumnProperties) {
+      return new Pair<List<Integer>, TSortingOrder>(new ArrayList<Integer>(),
+          sortingOrder);
     }
 
     // ALTER TABLE SET is not supported on HBase tables at all, see
     // AlterTableSetStmt::analyze().
     Preconditions.checkState(!(table instanceof FeHBaseTable));
 
-    if (table instanceof FeKuduTable) {
-      throw new AnalysisException(String.format("'%s' table property is not supported " +
-          "for Kudu tables.", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS));
-    }
-
     List<String> sortCols = Lists.newArrayList(
         Splitter.on(",").trimResults().omitEmptyStrings().split(
         tblProperties.get(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)));
-    return TableDef.analyzeSortColumns(sortCols,
-        Column.toColumnNames(table.getNonClusteringColumns()),
-        Column.toColumnNames(table.getClusteringColumns()));
+
+    return new Pair<>(TableDef.analyzeSortColumns(sortCols, table, sortingOrder),
+        sortingOrder);
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
index 6133776..a5f2e54 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
@@ -29,52 +29,59 @@ import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TTablePropertyType;
-
+import org.apache.impala.thrift.TSortingOrder;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
 /**
-* Represents an ALTER TABLE SORT BY (c1, c2, ...) statement.
+* Represents an ALTER TABLE SORT BY [LEXICAL|ZORDER] (c1, c2, ...) statement.
+*
 */
 public class AlterTableSortByStmt extends AlterTableStmt {
   // Table property key for sort.columns
   public static final String TBL_PROP_SORT_COLUMNS = "sort.columns";
+  public static final String TBL_PROP_SORT_ORDER = "sort.order";
 
   private final List<String> columns_;
+  private final TSortingOrder sortingOrder_;
 
-  public AlterTableSortByStmt(TableName tableName, List<String> columns) {
+  public AlterTableSortByStmt(TableName tableName, List<String> columns,
+      TSortingOrder sortingOrder) {
     super(tableName);
     Preconditions.checkNotNull(columns);
     columns_ = columns;
+    sortingOrder_ = sortingOrder;
   }
 
   @Override
   public TAlterTableParams toThrift() {
-   TAlterTableParams params = super.toThrift();
-   params.setAlter_type(TAlterTableType.SET_TBL_PROPERTIES);
-   TAlterTableSetTblPropertiesParams tblPropertyParams =
-       new TAlterTableSetTblPropertiesParams();
-   tblPropertyParams.setTarget(TTablePropertyType.TBL_PROPERTY);
-   Map<String, String> properties = new HashMap<>();
-   properties.put(TBL_PROP_SORT_COLUMNS, Joiner.on(",").join(columns_));
-   tblPropertyParams.setProperties(properties);
-   params.setSet_tbl_properties_params(tblPropertyParams);
-   return params;
+    TAlterTableParams params = super.toThrift();
+    params.setAlter_type(TAlterTableType.SET_TBL_PROPERTIES);
+    TAlterTableSetTblPropertiesParams tblPropertyParams =
+        new TAlterTableSetTblPropertiesParams();
+    tblPropertyParams.setTarget(TTablePropertyType.TBL_PROPERTY);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(TBL_PROP_SORT_COLUMNS, Joiner.on(",").join(columns_));
+    properties.put(TBL_PROP_SORT_ORDER, sortingOrder_.toString());
+    tblPropertyParams.setProperties(properties);
+    params.setSet_tbl_properties_params(tblPropertyParams);
+    return params;
   }
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     super.analyze(analyzer);
-
     // Disallow setting sort columns on HBase and Kudu tables.
     FeTable targetTable = getTargetTable();
     if (targetTable instanceof FeHBaseTable) {
-      throw new AnalysisException("ALTER TABLE SORT BY not supported on HBase tables.");
+      throw new AnalysisException(String.format("ALTER TABLE SORT BY not supported "
+          + "on HBase tables."));
     }
     if (targetTable instanceof FeKuduTable) {
-      throw new AnalysisException("ALTER TABLE SORT BY not supported on Kudu tables.");
+      throw new AnalysisException(String.format("ALTER TABLE SORT BY not supported "
+          + "on Kudu tables."));
     }
 
-    TableDef.analyzeSortColumns(columns_, targetTable);
+    TableDef.analyzeSortColumns(columns_, targetTable, sortingOrder_);
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index 5071842..10b34af 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -21,11 +21,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsAction;
-
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.THdfsFileFormat;
 
 
@@ -53,9 +53,10 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
         schemaLocation_.toString());
     String s = ToSqlUtils.getCreateTableSql(getDb(),
         getTbl() + " __LIKE_FILEFORMAT__ ",  getComment(), colsSql, partitionColsSql,
-        null, null, getSortColumns(), getTblProperties(), getSerdeProperties(),
-        isExternal(), getIfNotExists(), getRowFormat(),
-        HdfsFileFormat.fromThrift(getFileFormat()), compression, null, getLocation());
+        null, null, new Pair<>(getSortColumns(), getSortingOrder()),
+        getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
+        getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), compression, null,
+        getLocation());
     s = s.replace("__LIKE_FILEFORMAT__", String.format("LIKE %s '%s'",
         schemaFileFormat_, schemaLocation_.toString()));
     return s;
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
index 888b550..cecb214 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -24,10 +24,12 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCreateTableLikeParams;
 import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
 
 import com.google.common.base.Joiner;
@@ -40,6 +42,7 @@ import com.google.common.base.Preconditions;
 public class CreateTableLikeStmt extends StatementBase {
   private final TableName tableName_;
   private final List<String> sortColumns_;
+  private final TSortingOrder sortingOrder_;
   private final TableName srcTableName_;
   private final boolean isExternal_;
   private final String comment_;
@@ -58,7 +61,8 @@ public class CreateTableLikeStmt extends StatementBase {
   /**
    * Builds a CREATE TABLE LIKE statement
    * @param tableName - Name of the new table
-   * @param sortColumns - List of columns to sort by during inserts
+   * @param sortProperties - A pair, containing the list of columns to sort by during
+   *                         inserts and the order used in SORT BY queries.
    * @param srcTableName - Name of the source table (table to copy)
    * @param isExternal - If true, the table's data will be preserved if dropped.
    * @param comment - Comment to attach to the table
@@ -66,13 +70,15 @@ public class CreateTableLikeStmt extends StatementBase {
    * @param location - The HDFS location of where the table data will stored.
    * @param ifNotExists - If true, no errors are thrown if the table already exists
    */
-  public CreateTableLikeStmt(TableName tableName, List<String> sortColumns,
-      TableName srcTableName, boolean isExternal, String comment,
-      THdfsFileFormat fileFormat, HdfsUri location, boolean ifNotExists) {
+  public CreateTableLikeStmt(TableName tableName,
+      Pair<List<String>, TSortingOrder> sortProperties, TableName srcTableName,
+      boolean isExternal, String comment, THdfsFileFormat fileFormat, HdfsUri location,
+      boolean ifNotExists) {
     Preconditions.checkNotNull(tableName);
     Preconditions.checkNotNull(srcTableName);
     this.tableName_ = tableName;
-    this.sortColumns_ = sortColumns;
+    this.sortColumns_ = sortProperties.first;
+    this.sortingOrder_ = sortProperties.second;
     this.srcTableName_ = srcTableName;
     this.isExternal_ = isExternal;
     this.comment_ = comment;
@@ -87,6 +93,7 @@ public class CreateTableLikeStmt extends StatementBase {
   public boolean getIfNotExists() { return ifNotExists_; }
   public THdfsFileFormat getFileFormat() { return fileFormat_; }
   public HdfsUri getLocation() { return location_; }
+  public TSortingOrder getSortingOrder() { return sortingOrder_; }
 
   /**
    * Can only be called after analysis, returns the name of the database the table will
@@ -120,7 +127,8 @@ public class CreateTableLikeStmt extends StatementBase {
     if (tableName_.getDb() != null) sb.append(tableName_.getDb() + ".");
     sb.append(tableName_.getTbl() + " ");
     if (sortColumns_ != null && !sortColumns_.isEmpty()) {
-      sb.append("SORT BY (" + Joiner.on(",").join(sortColumns_) + ") ");
+      sb.append(String.format("SORT BY %s (%s) ", sortingOrder_.toString(),
+          Joiner.on(",").join(sortColumns_)));
     }
     sb.append("LIKE ");
     if (srcTableName_.getDb() != null) sb.append(srcTableName_.getDb() + ".");
@@ -143,6 +151,7 @@ public class CreateTableLikeStmt extends StatementBase {
     params.setIf_not_exists(getIfNotExists());
     params.setSort_columns(sortColumns_);
     params.setServer_name(serverName_);
+    params.setSorting_order(sortingOrder_);
     return params;
   }
 
@@ -193,7 +202,7 @@ public class CreateTableLikeStmt extends StatementBase {
     }
 
     if (sortColumns_ != null) {
-      TableDef.analyzeSortColumns(sortColumns_, srcTable);
+      TableDef.analyzeSortColumns(sortColumns_, srcTable, sortingOrder_);
     }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index aef5e5c..0a1ea77 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -30,6 +30,7 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaParser;
@@ -108,6 +109,7 @@ public class CreateTableStmt extends StatementBase {
     return tableDef_.getKuduPartitionParams();
   }
   public List<String> getSortColumns() { return tableDef_.getSortColumns(); }
+  public TSortingOrder getSortingOrder() { return tableDef_.getSortingOrder(); }
   public String getComment() { return tableDef_.getComment(); }
   Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); }
   private HdfsCachingOp getCachingOp() { return tableDef_.getCachingOp(); }
@@ -170,6 +172,7 @@ public class CreateTableStmt extends StatementBase {
     params.setFile_format(getFileFormat());
     params.setIf_not_exists(getIfNotExists());
     params.setSort_columns(getSortColumns());
+    params.setSorting_order(getSortingOrder());
     params.setTable_properties(Maps.newHashMap(getTblProperties()));
     params.getTable_properties().putAll(Maps.newHashMap(getGeneratedKuduProperties()));
     params.setSerde_properties(getSerdeProperties());
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 796486b..29aced1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -20,8 +20,10 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -56,7 +58,7 @@ public class DeleteStmt extends ModifyStmt {
     Preconditions.checkState(table_ != null);
     TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
         ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false,
-        ImmutableList.<Integer>of());
+        new Pair<>(ImmutableList.<Integer> of(), TSortingOrder.LEXICAL));
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return tableSink;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index a5371ec..1e7dd6d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -35,9 +35,11 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
 import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -154,6 +156,9 @@ public class InsertStmt extends StatementBase {
   // sent to the backend to populate the RowGroup::sorting_columns list in parquet files.
   private List<Integer> sortColumns_ = new ArrayList<>();
 
+  // The order used in SORT BY queries.
+  private TSortingOrder sortingOrder_ = TSortingOrder.LEXICAL;
+
   // Output expressions that produce the final results to write to the target table. May
   // include casts. Set in prepareExpressions().
   // If this is an INSERT on a non-Kudu table, it will contain one Expr for all
@@ -253,6 +258,7 @@ public class InsertStmt extends StatementBase {
     hasNoClusteredHint_ = false;
     sortExprs_.clear();
     sortColumns_.clear();
+    sortingOrder_ = TSortingOrder.LEXICAL;
     resultExprs_.clear();
     mentionedColumns_.clear();
     primaryKeyExprs_.clear();
@@ -387,7 +393,8 @@ public class InsertStmt extends StatementBase {
     // Populate partitionKeyExprs from partitionKeyValues and selectExprTargetColumns
     prepareExpressions(selectExprTargetColumns, selectListExprs, table_, analyzer);
 
-    // Analyze 'sort.columns' table property and populate sortColumns_ and sortExprs_.
+    // Analyze 'sort.columns' and 'sort.order' table properties and populate
+    // sortColumns_, sortExprs_, and sortingOrder_.
     analyzeSortColumns();
 
     // Analyze plan hints at the end to prefer reporting other error messages first
@@ -832,8 +839,11 @@ public class InsertStmt extends StatementBase {
   private void analyzeSortColumns() throws AnalysisException {
     if (!(table_ instanceof FeFsTable)) return;
 
-    sortColumns_ = AlterTableSetTblProperties.analyzeSortColumns(table_,
+    Pair<List<Integer>, TSortingOrder> sortProperties =
+        AlterTableSetTblProperties.analyzeSortColumns(table_,
         table_.getMetaStoreTable().getParameters());
+    sortColumns_ = sortProperties.first;
+    sortingOrder_ = sortProperties.second;
 
     // Assign sortExprs_ based on sortColumns_.
     for (Integer colIdx: sortColumns_) sortExprs_.add(resultExprs_.get(colIdx));
@@ -907,6 +917,7 @@ public class InsertStmt extends StatementBase {
   public void setTargetTable(FeTable table) { this.table_ = table; }
   public void setWriteId(long writeId) { this.writeId_ = writeId; }
   public boolean isOverwrite() { return overwrite_; }
+  public TSortingOrder getSortingOrder() { return sortingOrder_; }
 
   /**
    * Only valid after analysis
@@ -941,7 +952,7 @@ public class InsertStmt extends StatementBase {
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
         partitionKeyExprs_, resultExprs_, mentionedColumns_, overwrite_,
-        requiresClustering(), sortColumns_, writeId_);
+        requiresClustering(), new Pair<>(sortColumns_, sortingOrder_), writeId_);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index abe6bff..edbdfea 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.PlanNode;
+import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
@@ -59,9 +60,15 @@ public class SortInfo {
   private final List<Expr> materializedExprs_;
   // Maps from exprs materialized into the sort tuple to their corresponding SlotRefs.
   private final ExprSubstitutionMap outputSmap_;
+  private TSortingOrder sortingOrder_;
 
   public SortInfo(List<Expr> sortExprs, List<Boolean> isAscOrder,
       List<Boolean> nullsFirstParams) {
+    this(sortExprs, isAscOrder, nullsFirstParams, TSortingOrder.LEXICAL);
+  }
+
+  public SortInfo(List<Expr> sortExprs, List<Boolean> isAscOrder,
+      List<Boolean> nullsFirstParams, TSortingOrder sortingOrder) {
     Preconditions.checkArgument(sortExprs.size() == isAscOrder.size());
     Preconditions.checkArgument(sortExprs.size() == nullsFirstParams.size());
     sortExprs_ = sortExprs;
@@ -69,6 +76,7 @@ public class SortInfo {
     nullsFirstParams_ = nullsFirstParams;
     materializedExprs_ = new ArrayList<>();
     outputSmap_ = new ExprSubstitutionMap();
+    sortingOrder_ = sortingOrder;
   }
 
   /**
@@ -81,6 +89,7 @@ public class SortInfo {
     materializedExprs_ = Expr.cloneList(other.materializedExprs_);
     sortTupleDesc_ = other.sortTupleDesc_;
     outputSmap_ = other.outputSmap_.clone();
+    sortingOrder_ = other.sortingOrder_;
   }
 
   public List<Expr> getSortExprs() { return sortExprs_; }
@@ -89,6 +98,7 @@ public class SortInfo {
   public List<Expr> getMaterializedExprs() { return materializedExprs_; }
   public TupleDescriptor getSortTupleDescriptor() { return sortTupleDesc_; }
   public ExprSubstitutionMap getOutputSmap() { return outputSmap_; }
+  public TSortingOrder getSortingOrder() { return sortingOrder_; }
 
   /**
    * Gets the list of booleans indicating whether nulls come first or last, independent
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index f9f57c5..a533490 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -18,12 +18,14 @@
 package org.apache.impala.analysis;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
@@ -32,12 +34,15 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.MetaStoreUtil;
 
@@ -126,11 +131,15 @@ class TableDef {
     // Key/values to persist with table metadata.
     final Map<String, String> tblProperties;
 
-    Options(List<String> sortCols, String comment, RowFormat rowFormat,
-        Map<String, String> serdeProperties, THdfsFileFormat fileFormat, HdfsUri location,
-        HdfsCachingOp cachingOp, Map<String, String> tblProperties,
-        TQueryOptions queryOptions) {
-      this.sortCols = sortCols;
+    // Sorting order for SORT BY queries.
+    final TSortingOrder sortingOrder;
+
+    Options(Pair<List<String>, TSortingOrder> sortProperties, String comment,
+        RowFormat rowFormat, Map<String, String> serdeProperties,
+        THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
+        Map<String, String> tblProperties, TQueryOptions queryOptions) {
+      this.sortCols = sortProperties.first;
+      this.sortingOrder = sortProperties.second;
       this.comment = comment;
       this.rowFormat = rowFormat;
       Preconditions.checkNotNull(serdeProperties);
@@ -148,9 +157,9 @@ class TableDef {
     public Options(String comment, TQueryOptions queryOptions) {
       // Passing null to file format so that it uses the file format from the query option
       // if specified, otherwise it will use the default file format, which is TEXT.
-      this(ImmutableList.of(), comment, RowFormat.DEFAULT_ROW_FORMAT,
-          new HashMap<>(), /*file format*/ null, null, null, new HashMap<>(),
-          queryOptions);
+      this(new Pair<>(ImmutableList.of(), TSortingOrder.LEXICAL), comment,
+          RowFormat.DEFAULT_ROW_FORMAT, new HashMap<>(), /* file format */null, null,
+          null, new HashMap<>(), queryOptions);
     }
   }
 
@@ -181,6 +190,10 @@ class TableDef {
   List<ColumnDef> getColumnDefs() { return columnDefs_; }
   List<String> getColumnNames() { return ColumnDef.toColumnNames(columnDefs_); }
 
+  List<Type> getColumnTypes() {
+    return columnDefs_.stream().map(col -> col.getType()).collect(Collectors.toList());
+  }
+
   List<String> getPartitionColumnNames() {
     return ColumnDef.toColumnNames(getPartitionColumnDefs());
   }
@@ -214,6 +227,7 @@ class TableDef {
   Map<String, String> getSerdeProperties() { return options_.serdeProperties; }
   THdfsFileFormat getFileFormat() { return options_.fileFormat; }
   RowFormat getRowFormat() { return options_.rowFormat; }
+  TSortingOrder getSortingOrder() { return options_.sortingOrder; }
 
   /**
    * Analyzes the parameters of a CREATE TABLE statement.
@@ -310,16 +324,20 @@ class TableDef {
   }
 
   /**
-   * Analyzes the list of columns in 'sortCols' against the columns of 'table'. Each
-   * column of 'sortCols' must occur in 'table' as a non-partitioning column. 'table'
-   * must be an HDFS table. If there are errors during the analysis, this will throw an
-   * AnalysisException.
+   * Analyzes the list of columns in 'sortCols' against the columns of 'table' and
+   * returns their matching positions in the table's columns. Each column of 'sortCols'
+   * must occur in 'table' as a non-partitioning column. 'table' must be an HDFS table.
+   * If there are errors during the analysis, this will throw an AnalysisException.
    */
-  public static void analyzeSortColumns(List<String> sortCols, FeTable table)
-      throws AnalysisException {
+  public static List<Integer> analyzeSortColumns(List<String> sortCols, FeTable table,
+      TSortingOrder sortingOrder) throws AnalysisException {
     Preconditions.checkState(table instanceof FeFsTable);
-    analyzeSortColumns(sortCols, Column.toColumnNames(table.getNonClusteringColumns()),
-        Column.toColumnNames(table.getClusteringColumns()));
+
+    List<Type> columnTypes = table.getNonClusteringColumns().stream().map(
+        col -> col.getType()).collect(Collectors.toList());
+    return analyzeSortColumns(sortCols,
+        Column.toColumnNames(table.getNonClusteringColumns()),
+        Column.toColumnNames(table.getClusteringColumns()), columnTypes, sortingOrder);
   }
 
   /**
@@ -329,8 +347,8 @@ class TableDef {
    * AnalysisException.
    */
   public static List<Integer> analyzeSortColumns(List<String> sortCols,
-      List<String> tableCols, List<String> partitionCols)
-      throws AnalysisException {
+      List<String> tableCols, List<String> partitionCols, List<Type> columnTypes,
+      TSortingOrder sortingOrder) throws AnalysisException {
     // The index of each sort column in the list of table columns.
     Set<Integer> colIdxs = new LinkedHashSet<>();
 
@@ -357,10 +375,33 @@ class TableDef {
         }
       }
       if (!foundColumn) {
-        throw new AnalysisException(String.format("Could not find SORT BY column '%s' " +
-            "in table.", sortColName));
+        throw new AnalysisException(String.format("Could not find SORT BY column " +
+            "'%s' in table.", sortColName));
       }
     }
+
+    // Analyzing Z-Order specific constraints
+    if (sortingOrder == TSortingOrder.ZORDER) {
+      if (numColumns == 1) {
+        throw new AnalysisException(String.format("SORT BY ZORDER with 1 column is " +
+            "equivalent to SORT BY. Please, use the latter, if that was your " +
+            "intention."));
+      }
+
+      List<? extends Type> notSupportedTypes = Arrays.asList(Type.STRING, Type.VARCHAR,
+          Type.FLOAT, Type.DOUBLE);
+      for (Integer position : colIdxs) {
+        Type colType = columnTypes.get(position);
+
+        if (notSupportedTypes.stream().anyMatch(type -> colType.matchesType(type))) {
+          throw new AnalysisException(String.format("SORT BY ZORDER does not support "
+              + "column types: %s", String.join(", ",
+                  notSupportedTypes.stream().map(type -> type.toString())
+                  .collect(Collectors.toList()))));
+        }
+      }
+    }
+
     Preconditions.checkState(numColumns == colIdxs.size());
     return Lists.newArrayList(colIdxs);
   }
@@ -388,18 +429,27 @@ class TableDef {
     analyzeRowFormat(analyzer);
 
     String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS;
+    String sortOrderKey = AlterTableSortByStmt.TBL_PROP_SORT_ORDER;
     if (options_.tblProperties.containsKey(sortByKey)) {
       throw new AnalysisException(String.format("Table definition must not contain the " +
           "%s table property. Use SORT BY (...) instead.", sortByKey));
     }
 
+    if (options_.tblProperties.containsKey(sortOrderKey)) {
+      throw new AnalysisException(String.format("Table definition must not contain the " +
+          "%s table property. Use SORT BY %s (...) instead.", sortOrderKey,
+          options_.sortingOrder.toString()));
+    }
+
     // Analyze sort columns.
     if (options_.sortCols == null) return;
     if (isKuduTable()) {
-      throw new AnalysisException("SORT BY is not supported for Kudu tables.");
+      throw new AnalysisException(String.format("SORT BY is not supported for Kudu "+
+          "tables."));
     }
-    analyzeSortColumns(options_.sortCols, getColumnNames(),
-        getPartitionColumnNames());
+
+    analyzeSortColumns(options_.sortCols, getColumnNames(), getPartitionColumnNames(),
+        getColumnTypes(), options_.sortingOrder);
   }
 
   private void analyzeRowFormat(Analyzer analyzer) throws AnalysisException {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 5ded467..54db034 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -45,6 +45,8 @@ import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.KuduUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -62,11 +64,13 @@ import com.google.common.collect.Maps;
  */
 public class ToSqlUtils {
   // Table properties to hide when generating the toSql() statement
-  // EXTERNAL, SORT BY, and comment are hidden because they are part of the toSql result,
-  // e.g., "CREATE EXTERNAL TABLE <name> ... SORT BY (...) ... COMMENT <comment> ..."
+  // EXTERNAL, SORT BY [order], and comment are hidden because they are part of the
+  // toSql result, e.g.,
+  // "CREATE EXTERNAL TABLE <name> ... SORT BY ZORDER (...) ... COMMENT <comment> ..."
   @VisibleForTesting
   protected static final ImmutableSet<String> HIDDEN_TABLE_PROPERTIES = ImmutableSet.of(
-      "EXTERNAL", "comment", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS);
+      "EXTERNAL", "comment", AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
+      AlterTableSortByStmt.TBL_PROP_SORT_ORDER);
 
   /**
    * Removes all hidden properties from the given 'tblProperties' map.
@@ -98,6 +102,17 @@ public class ToSqlUtils {
   }
 
   /**
+   * Returns the sorting order from 'properties' or the default value (lexicographic
+   * ordering), if 'properties' doesn't contain 'sort.order'.
+   */
+  @VisibleForTesting
+  protected static String getSortingOrder(Map<String, String> properties) {
+    String sortOrderKey = AlterTableSortByStmt.TBL_PROP_SORT_ORDER;
+    if (!properties.containsKey(sortOrderKey)) return TSortingOrder.LEXICAL.toString();
+    return properties.get(sortOrderKey);
+  }
+
+  /**
    * Returns a comma-delimited string of Kudu 'partition by' parameters on a
    * CreateTableStmt, or null if this isn't a CreateTableStmt for a Kudu table.
    */
@@ -238,10 +253,10 @@ public class ToSqlUtils {
     // TODO: Pass the correct compression, if applicable.
     return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql,
         partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), kuduParamsSql,
-        stmt.getSortColumns(), properties, stmt.getSerdeProperties(),
-        stmt.isExternal(), stmt.getIfNotExists(), stmt.getRowFormat(),
-        HdfsFileFormat.fromThrift(stmt.getFileFormat()), HdfsCompression.NONE, null,
-        stmt.getLocation());
+        new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()), properties,
+        stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
+        stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()),
+        HdfsCompression.NONE, null, stmt.getLocation());
   }
 
   /**
@@ -273,8 +288,9 @@ public class ToSqlUtils {
     String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(),
         innerStmt.getComment(), null, partitionColsSql,
         innerStmt.getTblPrimaryKeyColumnNames(), kuduParamsSql,
-        innerStmt.getSortColumns(), properties, innerStmt.getSerdeProperties(),
-        innerStmt.isExternal(), innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
+        new Pair<>(innerStmt.getSortColumns(), innerStmt.getSortingOrder()),
+        properties, innerStmt.getSerdeProperties(), innerStmt.isExternal(),
+        innerStmt.getIfNotExists(), innerStmt.getRowFormat(),
         HdfsFileFormat.fromThrift(innerStmt.getFileFormat()), HdfsCompression.NONE, null,
         innerStmt.getLocation());
     return createTableSql + " AS " + stmt.getQueryStmt().toSql(options);
@@ -296,6 +312,7 @@ public class ToSqlUtils {
     boolean isExternal = msTable.getTableType() != null &&
         msTable.getTableType().equals(TableType.EXTERNAL_TABLE.toString());
     List<String> sortColsSql = getSortColumns(properties);
+    TSortingOrder sortingOrder = TSortingOrder.valueOf(getSortingOrder(properties));
     String comment = properties.get("comment");
     removeHiddenTableProperties(properties);
     List<String> colsSql = new ArrayList<>();
@@ -356,9 +373,10 @@ public class ToSqlUtils {
     }
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
     return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
-        partitionColsSql, primaryKeySql, kuduPartitionByParams, sortColsSql, properties,
-        serdeParameters, isExternal, false, rowFormat, format, compression,
-        storageHandlerClassName, tableLocation);
+        partitionColsSql, primaryKeySql, kuduPartitionByParams,
+        new Pair<>(sortColsSql, sortingOrder), properties, serdeParameters,
+        isExternal, false, rowFormat, format, compression, storageHandlerClassName,
+        tableLocation);
   }
 
   /**
@@ -369,9 +387,10 @@ public class ToSqlUtils {
   public static String getCreateTableSql(String dbName, String tableName,
       String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
       List<String> primaryKeysSql, String kuduPartitionByParams,
-      List<String> sortColsSql, Map<String, String> tblProperties,
-      Map<String, String> serdeParameters, boolean isExternal, boolean ifNotExists,
-      RowFormat rowFormat, HdfsFileFormat fileFormat, HdfsCompression compression,
+      Pair<List<String>, TSortingOrder> sortProperties,
+      Map<String, String> tblProperties, Map<String, String> serdeParameters,
+      boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
+      HdfsFileFormat fileFormat, HdfsCompression compression,
       String storageHandlerClass, HdfsUri location) {
     Preconditions.checkNotNull(tableName);
     StringBuilder sb = new StringBuilder("CREATE ");
@@ -405,10 +424,9 @@ public class ToSqlUtils {
     if (kuduPartitionByParams != null && !kuduPartitionByParams.equals("")) {
       sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
     }
-
-    if (sortColsSql != null) {
-      sb.append(String.format("SORT BY (\n  %s\n)\n",
-          Joiner.on(", \n  ").join(sortColsSql)));
+    if (sortProperties.first != null) {
+      sb.append(String.format("SORT BY %s (\n  %s\n)\n", sortProperties.second.toString(),
+          Joiner.on(", \n  ").join(sortProperties.first)));
     }
 
     if (tableComment != null) sb.append(" COMMENT '" + tableComment + "'\n");
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index f38c253..575af59 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -66,7 +67,7 @@ public class UpdateStmt extends ModifyStmt {
     Preconditions.checkState(table_ != null);
     DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
         ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false,
-        ImmutableList.<Integer>of());
+        new Pair<>(ImmutableList.<Integer> of(), TSortingOrder.LEXICAL));
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return dataSink;
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 598d483..f45d8fb 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -42,6 +42,7 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TSortingOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -276,9 +277,16 @@ public class AnalyticPlanner {
    * Create SortInfo, including sort tuple, to sort entire input row
    * on sortExprs.
    */
-  private SortInfo createSortInfo(
-      PlanNode input, List<Expr> sortExprs, List<Boolean> isAsc,
-      List<Boolean> nullsFirst) {
+  private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs,
+      List<Boolean> isAsc, List<Boolean> nullsFirst) {
+    return createSortInfo(input, sortExprs, isAsc, nullsFirst, TSortingOrder.LEXICAL);
+  }
+
+   /**
+   * Same as above, but with extra parameter, sorting order.
+   */
+  private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs,
+      List<Boolean> isAsc, List<Boolean> nullsFirst, TSortingOrder sortingOrder) {
     List<Expr> inputSlotRefs = new ArrayList<>();
     for (TupleId tid: input.getTupleIds()) {
       TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
@@ -306,7 +314,8 @@ public class AnalyticPlanner {
     ExprSubstitutionMap inputSmap = input.getOutputSmap();
     List<Expr> resolvedSortExprs =
         Expr.substituteList(sortExprs, inputSmap, analyzer_, true);
-    SortInfo sortInfo = new SortInfo(resolvedSortExprs, isAsc, nullsFirst);
+    SortInfo sortInfo = new SortInfo(resolvedSortExprs, isAsc, nullsFirst,
+        sortingOrder);
     sortInfo.createSortTupleInfo(inputSlotRefs, analyzer_);
 
     // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains
@@ -626,7 +635,7 @@ public class AnalyticPlanner {
     List<AnalyticExpr> analyticExprs = analyticInfo_.getAnalyticExprs();
     List<WindowGroup> groups = new ArrayList<>();
     for (int i = 0; i < analyticExprs.size(); ++i) {
-      AnalyticExpr analyticExpr = (AnalyticExpr) analyticExprs.get(i);
+      AnalyticExpr analyticExpr = analyticExprs.get(i);
       // Do not generate the plan for non-materialized analytic exprs.
       if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) {
         continue;
@@ -634,7 +643,7 @@ public class AnalyticPlanner {
       boolean match = false;
       for (WindowGroup group: groups) {
         if (group.isCompatible(analyticExpr)) {
-          group.add((AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
+          group.add(analyticInfo_.getAnalyticExprs().get(i),
               analyticInfo_.getOutputTupleDesc().getSlots().get(i),
               analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
           match = true;
@@ -643,7 +652,7 @@ public class AnalyticPlanner {
       }
       if (!match) {
         groups.add(new WindowGroup(
-            (AnalyticExpr) analyticInfo_.getAnalyticExprs().get(i),
+            analyticInfo_.getAnalyticExprs().get(i),
             analyticInfo_.getOutputTupleDesc().getSlots().get(i),
             analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
       }
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index b8221c7..b1ad34a 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -134,17 +134,9 @@ public class ExchangeNode extends PlanNode {
 
     if (isMergingExchange() && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
       output.append(detailPrefix + "order by: ");
-      for (int i = 0; i < mergeInfo_.getSortExprs().size(); ++i) {
-        if (i > 0) output.append(", ");
-        output.append(mergeInfo_.getSortExprs().get(i).toSql() + " ");
-        output.append(mergeInfo_.getIsAscOrder().get(i) ? "ASC" : "DESC");
-
-        Boolean nullsFirstParam = mergeInfo_.getNullsFirstParams().get(i);
-        if (nullsFirstParam != null) {
-          output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
-        }
-      }
-      output.append("\n");
+      output.append(getSortingOrderExplainString(mergeInfo_.getSortExprs(),
+          mergeInfo_.getIsAscOrder(), mergeInfo_.getNullsFirstParams(),
+          mergeInfo_.getSortingOrder()));
     }
     return output.toString();
   }
@@ -267,7 +259,7 @@ public class ExchangeNode extends PlanNode {
     if (isMergingExchange()) {
       TSortInfo sortInfo = new TSortInfo(
           Expr.treesToThrift(mergeInfo_.getSortExprs()), mergeInfo_.getIsAscOrder(),
-          mergeInfo_.getNullsFirst());
+          mergeInfo_.getNullsFirst(), mergeInfo_.getSortingOrder());
       msg.exchange_node.setSort_info(sortInfo);
       msg.exchange_node.setOffset(offset_);
     }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 3eaf356..5ddc63d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -26,11 +26,13 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.THdfsTableSink;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 
@@ -66,21 +68,24 @@ public class HdfsTableSink extends TableSink {
   // Stores the indices into the list of non-clustering columns of the target table that
   // are stored in the 'sort.columns' table property. This is sent to the backend to
   // populate the RowGroup::sorting_columns list in parquet files.
+  // If sortingOrder_ is not lexicographical, the backend will skip this process.
   private List<Integer> sortColumns_ = new ArrayList<>();
+  private TSortingOrder sortingOrder_;
 
   // Stores the allocated write id if the target table is transactional, otherwise -1.
   private long writeId_;
 
   public HdfsTableSink(FeTable targetTable, List<Expr> partitionKeyExprs,
       List<Expr> outputExprs,
-      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns,
-      long writeId) {
+      boolean overwrite, boolean inputIsClustered,
+      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId) {
     super(targetTable, Op.INSERT, outputExprs);
     Preconditions.checkState(targetTable instanceof FeFsTable);
     partitionKeyExprs_ = partitionKeyExprs;
     overwrite_ = overwrite;
     inputIsClustered_ = inputIsClustered;
-    sortColumns_ = sortColumns;
+    sortColumns_ = sortProperties.first;
+    sortingOrder_ = sortProperties.second;
     writeId_ = writeId;
   }
 
@@ -192,7 +197,8 @@ public class HdfsTableSink extends TableSink {
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     THdfsTableSink hdfsTableSink = new THdfsTableSink(
-        Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_);
+        Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_,
+        sortingOrder_);
     FeFsTable table = (FeFsTable) targetTable_;
     StringBuilder error = new StringBuilder();
     int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);
@@ -202,8 +208,8 @@ public class HdfsTableSink extends TableSink {
       hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
     }
     hdfsTableSink.setSort_columns(sortColumns_);
+    hdfsTableSink.setSorting_order(sortingOrder_);
     if (writeId_ != -1) hdfsTableSink.setWrite_id(writeId_);
-
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
         TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.hdfs_table_sink = hdfsTableSink;
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 0ab96e1..cd127bc 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -29,6 +29,7 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.ToSqlOptions;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
@@ -41,6 +42,7 @@ import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlan;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.BitUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -621,6 +623,50 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     return output.toString();
   }
 
+  protected String getExplainString(
+      List<? extends Expr> exprs, TExplainLevel detailLevel) {
+    if (exprs == null) return "";
+    ToSqlOptions toSqlOptions =
+        detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() ?
+        ToSqlOptions.SHOW_IMPLICIT_CASTS :
+        ToSqlOptions.DEFAULT;
+    StringBuilder output = new StringBuilder();
+    for (int i = 0; i < exprs.size(); ++i) {
+      if (i > 0) output.append(", ");
+      output.append(exprs.get(i).toSql(toSqlOptions));
+    }
+    return output.toString();
+  }
+
+  protected String getSortingOrderExplainString(List<? extends Expr> exprs,
+      List<Boolean> isAscOrder, List<Boolean> nullsFirstParams,
+      TSortingOrder sortingOrder) {
+    StringBuilder output = new StringBuilder();
+    switch (sortingOrder) {
+    case LEXICAL:
+      for (int i = 0; i < exprs.size(); ++i) {
+        if (i > 0) output.append(", ");
+        output.append(exprs.get(i).toSql() + " ");
+        output.append(isAscOrder.get(i) ? "ASC" : "DESC");
+
+        Boolean nullsFirstParam = nullsFirstParams.get(i);
+        if (nullsFirstParam != null) {
+          output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
+        }
+      }
+      break;
+    case ZORDER:
+      output.append("ZORDER: ");
+      for (int i = 0; i < exprs.size(); ++i) {
+        if (i > 0) output.append(", ");
+        output.append(exprs.get(i).toSql());
+      }
+      break;
+    }
+    output.append("\n");
+    return output.toString();
+  }
+
   /**
    * Returns true if stats-related variables are valid.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index f265ae9..6a51983 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -699,7 +699,8 @@ public class Planner {
     // Build sortinfo to sort by the ordering exprs.
     List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
     List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
-    SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams);
+    SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams,
+        insertStmt.getSortingOrder());
     sortInfo.createSortTupleInfo(insertStmt.getResultExprs(), analyzer);
     sortInfo.getSortTupleDescriptor().materializeSlots();
     insertStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 2e5a09b..c05969c 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -34,6 +34,7 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
 import org.apache.impala.thrift.TSortType;
+import org.apache.impala.thrift.TSortingOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -190,7 +191,7 @@ public class SortNode extends PlanNode {
   protected void toThrift(TPlanNode msg) {
     msg.node_type = TPlanNodeType.SORT_NODE;
     TSortInfo sort_info = new TSortInfo(Expr.treesToThrift(info_.getSortExprs()),
-        info_.getIsAscOrder(), info_.getNullsFirst());
+        info_.getIsAscOrder(), info_.getNullsFirst(), info_.getSortingOrder());
     Preconditions.checkState(tupleIds_.size() == 1,
         "Incorrect size for tupleIds_ in SortNode");
     sort_info.setSort_tuple_slot_exprs(Expr.treesToThrift(resolvedTupleExprs_));
@@ -207,17 +208,8 @@ public class SortNode extends PlanNode {
         displayName_, getNodeExplainDetail(detailLevel)));
     if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
       output.append(detailPrefix + "order by: ");
-      for (int i = 0; i < info_.getSortExprs().size(); ++i) {
-        if (i > 0) output.append(", ");
-        output.append(info_.getSortExprs().get(i).toSql() + " ");
-        output.append(info_.getIsAscOrder().get(i) ? "ASC" : "DESC");
-
-        Boolean nullsFirstParam = info_.getNullsFirstParams().get(i);
-        if (nullsFirstParam != null) {
-          output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
-        }
-      }
-      output.append("\n");
+      output.append(getSortingOrderExplainString(info_.getSortExprs(),
+          info_.getIsAscOrder(), info_.getNullsFirstParams(), info_.getSortingOrder()));
     }
 
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index f37a233..6b23ef0 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -24,7 +24,9 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TSinkAction;
+import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
 
@@ -91,15 +93,16 @@ public abstract class TableSink extends DataSink {
    * Not all Ops are supported for all tables.
    * All parameters must be non-null, the lists in particular need to be empty if they
    * don't make sense for a certain table type.
-   * For HDFS tables 'sortColumns' specifies the indices into the list of non-clustering
-   * columns of the target table that are stored in the 'sort.columns' table property.
+   * For HDFS tables 'sortProperties' specifies two things, the indices into the list of
+   * non-clustering columns of the target table that are stored in the 'sort.columns'
+   * table property, and the sorting order.
    */
   public static TableSink create(FeTable table, Op sinkAction,
       List<Expr> partitionKeyExprs, List<Expr> outputExprs,
-      List<Integer> referencedColumns,
-      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns) {
+      List<Integer> referencedColumns, boolean overwrite,
+      boolean inputIsClustered, Pair<List<Integer>, TSortingOrder> sortProperties) {
     return create(table, sinkAction, partitionKeyExprs, outputExprs, referencedColumns,
-        overwrite, inputIsClustered, sortColumns, -1);
+        overwrite, inputIsClustered, sortProperties, -1);
   }
 
   /**
@@ -108,18 +111,18 @@ public abstract class TableSink extends DataSink {
   public static TableSink create(FeTable table, Op sinkAction,
       List<Expr> partitionKeyExprs, List<Expr> outputExprs,
       List<Integer> referencedColumns,
-      boolean overwrite, boolean inputIsClustered, List<Integer> sortColumns,
-      long writeId) {
+      boolean overwrite, boolean inputIsClustered,
+      Pair<List<Integer>, TSortingOrder> sortProperties, long writeId) {
     Preconditions.checkNotNull(partitionKeyExprs);
     Preconditions.checkNotNull(referencedColumns);
-    Preconditions.checkNotNull(sortColumns);
+    Preconditions.checkNotNull(sortProperties.first);
     if (table instanceof FeFsTable) {
       // Hdfs only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
       // Referenced columns don't make sense for an Hdfs table.
       Preconditions.checkState(referencedColumns.isEmpty());
       return new HdfsTableSink(table, partitionKeyExprs,outputExprs, overwrite,
-          inputIsClustered, sortColumns, writeId);
+          inputIsClustered, sortProperties, writeId);
     } else if (table instanceof FeHBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
@@ -130,14 +133,14 @@ public abstract class TableSink extends DataSink {
       // Referenced columns don't make sense for an HBase table.
       Preconditions.checkState(referencedColumns.isEmpty());
       // Sort columns are not supported for HBase tables.
-      Preconditions.checkState(sortColumns.isEmpty());
+      Preconditions.checkState(sortProperties.first.isEmpty());
       // Create the HBaseTableSink and return it.
       return new HBaseTableSink(table, outputExprs);
     } else if (table instanceof FeKuduTable) {
       // Kudu doesn't have a way to perform INSERT OVERWRITE.
       Preconditions.checkState(overwrite == false);
       // Sort columns are not supported for Kudu tables.
-      Preconditions.checkState(sortColumns.isEmpty());
+      Preconditions.checkState(sortProperties.first.isEmpty());
       return new KuduTableSink(table, sinkAction, referencedColumns, outputExprs);
     } else {
       throw new UnsupportedOperationException(
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index b528435..bfe364a 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -181,6 +181,14 @@ public class BackendConfig {
     return backendCfg_.blacklisted_tables;
   }
 
+  public boolean isZOrderSortUnlocked() {
+    return backendCfg_.unlock_zorder_sort;
+  }
+
+  public void setZOrderSortUnlocked(boolean zOrdering) {
+    backendCfg_.setUnlock_zorder_sort(zOrdering);
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 31e6b5d..6e4baea 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -165,6 +165,7 @@ import org.apache.impala.thrift.TResetMetadataResponse;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
@@ -2098,6 +2099,10 @@ public class CatalogOpExecutor {
     if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
       tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
           Joiner.on(",").join(params.sort_columns));
+      TSortingOrder sortingOrder = params.isSetSorting_order() ?
+          params.sorting_order : TSortingOrder.LEXICAL;
+      tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER,
+          sortingOrder.toString());
     }
     if (params.getComment() != null) {
       tbl.getParameters().put("comment", params.getComment());
@@ -2366,6 +2371,10 @@ public class CatalogOpExecutor {
     if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
       tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
           Joiner.on(",").join(params.sort_columns));
+      TSortingOrder sortingOrder = params.isSetSorting_order() ?
+          params.sorting_order : TSortingOrder.LEXICAL;
+      tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER,
+          sortingOrder.toString());
     }
     if (comment != null) {
       tbl.getParameters().put("comment", comment);
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 0867787..5d639ed 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -163,6 +163,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("kudu", SqlParserSymbols.KW_KUDU);
     keywordMap.put("last", SqlParserSymbols.KW_LAST);
     keywordMap.put("left", SqlParserSymbols.KW_LEFT);
+    keywordMap.put("lexical", SqlParserSymbols.KW_LEXICAL);
     keywordMap.put("like", SqlParserSymbols.KW_LIKE);
     keywordMap.put("limit", SqlParserSymbols.KW_LIMIT);
     keywordMap.put("lines", SqlParserSymbols.KW_LINES);
@@ -256,6 +257,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("when", SqlParserSymbols.KW_WHEN);
     keywordMap.put("where", SqlParserSymbols.KW_WHERE);
     keywordMap.put("with", SqlParserSymbols.KW_WITH);
+    keywordMap.put("zorder", SqlParserSymbols.KW_ZORDER);
 
     // Initilize tokenIdMap for error reporting
     tokenIdMap = new HashMap<>();
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index aeb71df..f807bdc 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -54,7 +54,9 @@ import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.util.MetaStoreUtil;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -1291,6 +1293,30 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   }
 
   @Test
+  public void TestAlterTableSortByZOrder() {
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    AnalyzesOk("alter table functional.alltypes sort by zorder (int_col,id)");
+    AnalyzesOk("alter table functional.alltypes sort by zorder (bool_col,int_col,id)");
+    AnalyzesOk("alter table functional.alltypes sort by zorder ()");
+    AnalysisError("alter table functional.alltypes sort by zorder (id)",
+        "SORT BY ZORDER with 1 column is equivalent to SORT BY. Please, use the " +
+        "latter, if that was your intention.");
+    AnalysisError("alter table functional.alltypes sort by zorder (id,int_col,id)",
+        "Duplicate column in SORT BY list: id");
+    AnalysisError("alter table functional.alltypes sort by zorder (id, foo)", "Could " +
+        "not find SORT BY column 'foo' in table.");
+    AnalysisError("alter table functional_hbase.alltypes sort by zorder (id, foo)",
+        "ALTER TABLE SORT BY not supported on HBase tables.");
+    AnalysisError("alter table functional.alltypes sort by zorder (bool_col,string_col)",
+        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
+        "DOUBLE");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+  }
+
+  @Test
   public void TestAlterView() {
     // View-definition references a table.
     AnalyzesOk("alter view functional.alltypes_view as " +
@@ -1924,6 +1950,9 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
   @Test
   public void TestCreateTableLikeFile() throws AnalysisException {
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
     // check that we analyze all of the CREATE TABLE options
     AnalyzesOk("create table if not exists newtbl_DNE like parquet "
         + "'/test-warehouse/schemas/alltypestiny.parquet'");
@@ -1936,8 +1965,17 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("create external table newtbl_DNE like parquet "
         + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip) "
         + "stored as parquet");
+    AnalyzesOk("create external table newtbl_DNE like parquet "
+        + "'/test-warehouse/schemas/decimal.parquet' sort by zorder (d32, d11) "
+        + "stored as parquet");
     AnalyzesOk("create table newtbl_DNE like parquet "
         + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by (id,zip)");
+    AnalyzesOk("create table newtbl_DNE like parquet "
+        + "'/test-warehouse/schemas/decimal.parquet' sort by zorder (d32, d11)");
+    AnalysisError("create table newtbl_DNE like parquet "
+        + "'/test-warehouse/schemas/zipcode_incomes.parquet' sort by  zorder (id,zip)",
+        "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), FLOAT, " +
+        "DOUBLE");
     AnalyzesOk("create table if not exists functional.zipcode_incomes like parquet "
         + "'/test-warehouse/schemas/zipcode_incomes.parquet'");
     AnalyzesOk("create table if not exists newtbl_DNE like parquet "
@@ -1983,6 +2021,9 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table newtbl_kudu like parquet " +
         "'/test-warehouse/schemas/alltypestiny.parquet' stored as kudu",
         "CREATE TABLE LIKE FILE statement is not supported for Kudu tables.");
+
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 
   @Test
@@ -2264,6 +2305,18 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("create table tbl sort by (int_col,id) like functional.alltypes");
     AnalysisError("create table tbl sort by (int_col,foo) like functional.alltypes",
         "Could not find SORT BY column 'foo' in table.");
+
+    // Test zsort columns.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    AnalyzesOk("create table tbl sort by zorder (int_col,id) like functional.alltypes");
+    AnalysisError("create table tbl sort by zorder (int_col,foo) like " +
+        "functional.alltypes", "Could not find SORT BY column 'foo' in table.");
+    AnalysisError("create table tbl sort by zorder (string_col,id) like " +
+        "functional.alltypes", "SORT BY ZORDER does not support column types: STRING, " +
+        "VARCHAR(*), FLOAT, DOUBLE");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 
   @Test
@@ -2513,26 +2566,65 @@ public class AnalyzeDDLTest extends FrontendTestBase {
           type.name());
     }
 
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
     // Tables with sort columns
     AnalyzesOk("create table functional.new_table (i int, j int) sort by (i)");
     AnalyzesOk("create table functional.new_table (i int, j int) sort by (i, j)");
     AnalyzesOk("create table functional.new_table (i int, j int) sort by (j, i)");
 
+    // Tables with sort columns, using Z-ordering
+    AnalysisError("create table functional.new_table (i int, j int) sort by zorder (i)",
+        "SORT BY ZORDER with 1 column is equivalent to SORT BY. Please, use the " +
+        "latter, if that was your intention.");
+    AnalyzesOk("create table functional.new_table (i int, j int) sort by zorder (i, j)");
+    AnalyzesOk("create table functional.new_table (i int, j int) sort by zorder (j, i)");
+
     // 'sort.columns' property not supported in table definition.
     AnalysisError("create table Foo (i int) sort by (i) " +
         "tblproperties ('sort.columns'='i')", "Table definition must not contain the " +
         "sort.columns table property. Use SORT BY (...) instead.");
 
+    // 'sort.order' property not supported in table definition.
+    AnalysisError("create table Foo (i int, j int) sort by zorder (i, j) " +
+        "tblproperties ('sort.order'='ZORDER')", "Table definition must not " +
+        "contain the sort.order table property. Use SORT BY ZORDER (...) instead.");
+
     // Column in sort by list must exist.
     AnalysisError("create table functional.new_table (i int) sort by (j)", "Could not " +
         "find SORT BY column 'j' in table.");
 
+    // Column in sort by zorder list must exist.
+    AnalysisError("create table functional.new_table (i int) sort by zorder (j)",
+        "Could not find SORT BY column 'j' in table.");
+
     // Partitioned HDFS table
     AnalyzesOk("create table functional.new_table (i int) PARTITIONED BY (d decimal)" +
         "SORT BY (i)");
+    AnalyzesOk("create table functional.new_table (i int, j int) PARTITIONED BY " +
+        "(d decimal) sort by zorder (i, j)");
     // Column in sort by list must not be a Hdfs partition column.
     AnalysisError("create table functional.new_table (i int) PARTITIONED BY (d decimal)" +
         "SORT BY (d)", "SORT BY column list must not contain partition column: 'd'");
+    // Column in sort by list must not be a Hdfs partition column.
+    AnalysisError("create table functional.new_table (i int) PARTITIONED BY (d decimal)" +
+        "sort by zorder (i, d)", "SORT BY column list must not contain partition " +
+        "column: 'd'");
+    // For Z-Order, string, varchar, float and double columns are not supported.
+    AnalysisError("create table functional.new_table (i int, s string) sort by zorder " +
+        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
+        "FLOAT, DOUBLE");
+    AnalysisError("create table functional.new_table (i int, s varchar(32)) sort by " +
+        "zorder (i, s)", "SORT BY ZORDER does not support column types: STRING, " +
+        "VARCHAR(*), FLOAT, DOUBLE");
+    AnalysisError("create table functional.new_table (i int, s double) sort by zorder " +
+        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
+        "FLOAT, DOUBLE");
+    AnalysisError("create table functional.new_table (i int, s float) sort by zorder " +
+        "(i, s)", "SORT BY ZORDER does not support column types: STRING, VARCHAR(*), " +
+        "FLOAT, DOUBLE");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 
   @Test
@@ -3497,7 +3589,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Single element path can only be resolved as <table>.
     DescribeTableStmt describe = (DescribeTableStmt)AnalyzesOk("describe ambig",
         createAnalysisCtx("ambig"));
-    TDescribeTableParams tdesc = (TDescribeTableParams) describe.toThrift();
+    TDescribeTableParams tdesc = describe.toThrift();
     Assert.assertTrue(tdesc.isSetTable_name());
     Assert.assertEquals("ambig", tdesc.table_name.getDb_name());
     Assert.assertEquals("ambig", tdesc.table_name.getTable_name(), "ambig");
@@ -3512,7 +3604,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // 4 element path can only be resolved to nested array.
     describe = (DescribeTableStmt) AnalyzesOk(
         "describe ambig.ambig.ambig.ambig", createAnalysisCtx("ambig"));
-    tdesc = (TDescribeTableParams) describe.toThrift();
+    tdesc = describe.toThrift();
     Type expectedType =
         org.apache.impala.analysis.Path.getTypeAsStruct(new ArrayType(Type.INT));
     Assert.assertTrue(tdesc.isSetResult_struct());
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
index 6dfa163..4269498 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.analysis;
 
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
 import org.apache.kudu.ColumnSchema.Encoding;
@@ -367,6 +368,15 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "partitions 8 sort by(i) stored as kudu", "SORT BY is not supported for Kudu " +
         "tables.");
 
+    // Z-Sort columns are not supported for Kudu tables.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    AnalysisError("create table tab (i int, x int primary key) partition by hash(x) " +
+        "partitions 8 sort by zorder(i) stored as kudu", "SORT BY is not " +
+        "supported for Kudu tables.");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+
     // Range partitions with TIMESTAMP
     AnalyzesOk("create table ts_ranges (ts timestamp primary key) " +
         "partition by range (partition cast('2009-01-01 00:00:00' as timestamp) " +
@@ -615,9 +625,22 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
     AnalysisError("alter table functional_kudu.alltypes sort by (int_col)",
         "ALTER TABLE SORT BY not supported on Kudu tables.");
 
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    // ALTER TABLE SORT BY ZORDER
+    AnalysisError("alter table functional_kudu.alltypes sort by zorder (int_col)",
+        "ALTER TABLE SORT BY not supported on Kudu tables.");
+
     // ALTER TABLE SET TBLPROPERTIES for sort.columns
     AnalysisError("alter table functional_kudu.alltypes set tblproperties(" +
         "'sort.columns'='int_col')",
-        "'sort.columns' table property is not supported for Kudu tables.");
+        "'sort.*' table properties are not supported for Kudu tables.");
+
+    // ALTER TABLE SET TBLPROPERTIES for sort.order
+    AnalysisError("alter table functional_kudu.alltypes set tblproperties("
+        + "'sort.order'='true')",
+        "'sort.*' table properties are not supported for Kudu tables.");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index eebefc9..a012538 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -29,6 +29,7 @@ import org.apache.impala.analysis.TimestampArithmeticExpr.TimeUnit;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.service.BackendConfig;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
@@ -2466,6 +2467,18 @@ public class ParserTest extends FrontendTestBase {
   }
 
   @Test
+  public void TestAlterTableZSortBy() {
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    ParsesOk("ALTER TABLE TEST SORT BY ZORDER (int_col, id)");
+    ParsesOk("ALTER TABLE TEST SORT BY ZORDER ()");
+    ParserError("ALTER TABLE TEST PARTITION (year=2009, month=4) SORT BY ZORDER " +
+        "(int_col, id)");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+  }
+
+  @Test
   public void TestAlterTableOrViewRename() {
     for (String entity: Lists.newArrayList("TABLE", "VIEW")) {
       ParsesOk(String.format("ALTER %s TestDb.Foo RENAME TO TestDb.Foo2", entity));
@@ -2593,6 +2606,51 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("CREATE TABLE Foo LIKE PARQUET '/user/foo' SORT BY (id)");
     ParserError("CREATE TABLE Foo SORT BY (id) LIKE PARQUET '/user/foo'");
 
+    // SORT BY ZORDER clause
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    ParsesOk("CREATE TABLE Foo (i int, j int) SORT BY ZORDER ()");
+    ParsesOk("CREATE TABLE Foo (i int) SORT BY ZORDER (i)");
+    ParsesOk("CREATE TABLE Foo (i int) SORT BY ZORDER (j)");
+    ParsesOk("CREATE TABLE Foo (i int, j int) SORT BY ZORDER (i,j)");
+    ParsesOk("CREATE EXTERNAL TABLE Foo (i int, s string) SORT BY ZORDER (s) " +
+        "LOCATION '/test-warehouse/'");
+    ParsesOk("CREATE TABLE Foo (i int, s string) SORT BY ZORDER (s) COMMENT 'hello' " +
+        "LOCATION '/a/b/' TBLPROPERTIES ('123'='1234')");
+
+
+    // SORT BY ZORDER must be the first table option
+    ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' SORT BY ZORDER (s) " +
+        "LOCATION '/a/b/' TBLPROPERTIES ('123'='1234')");
+    ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' LOCATION '/a/b/' " +
+        "SORT BY ZORDER (s) TBLPROPERTIES ('123'='1234')");
+    ParserError("CREATE TABLE Foo (i int, s string) COMMENT 'hello' LOCATION '/a/b/' " +
+        "TBLPROPERTIES ('123'='1234') SORT BY ZORDER (s)");
+
+    // Malformed SORT BY ZORDER clauses
+    ParserError("CREATE TABLE Foo (i int, j int) SORT BY ZORDER");
+    ParserError("CREATE TABLE Foo (i int, j int) SORT BY ZORDER (i,)");
+    ParserError("CREATE TABLE Foo (i int, j int) SORT BY ZORDER (int)");
+
+    // Create table like other table with zsort columns
+    ParsesOk("CREATE TABLE Foo SORT BY ZORDER(bar) LIKE Baz STORED AS TEXTFILE " +
+        "LOCATION '/a/b'");
+    ParserError("CREATE TABLE SORT BY ZORDER(bar) Foo LIKE Baz STORED AS TEXTFILE " +
+        "LOCATION '/a/b'");
+    // SORT BY ZORDER must be the first table option
+    ParserError("CREATE TABLE Foo LIKE Baz STORED AS TEXTFILE LOCATION '/a/b' " +
+        "SORT BY ZORDER(bar)");
+
+    // CTAS with zsort columns
+    ParsesOk("CREATE TABLE Foo SORT BY ZORDER(bar) AS SELECT * FROM BAR");
+    ParserError("CREATE TABLE Foo AS SELECT * FROM BAR SORT BY ZORDER(bar)");
+
+    // Create table like file with zsort columns
+    ParsesOk("CREATE TABLE Foo LIKE PARQUET '/user/foo' SORT BY ZORDER (id)");
+    ParserError("CREATE TABLE Foo SORT BY ZORDER (id) LIKE PARQUET '/user/foo'");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+
     // Column comments
     ParsesOk("CREATE TABLE Foo (i int COMMENT 'hello', s string)");
     ParsesOk("CREATE TABLE Foo (i int COMMENT 'hello', s string COMMENT 'hi')");
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 5e7c92b..8f7f0a5 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.TestUtils;
 import org.junit.Test;
 
@@ -319,16 +320,39 @@ public class ToSqlTest extends FrontendTestBase {
 
   @Test
   public void TestCreateTable() throws AnalysisException {
+    // Table with SORT BY clause.
     testToSql("create table p (a int) partitioned by (day string) sort by (a) " +
         "comment 'This is a test'",
         "default",
         "CREATE TABLE default.p ( a INT ) PARTITIONED BY ( day STRING ) " +
-        "SORT BY ( a ) COMMENT 'This is a test' STORED AS TEXTFILE" , true);
-    // Table with SORT BY clause.
+        "SORT BY LEXICAL ( a ) COMMENT 'This is a test' STORED AS TEXTFILE" , true);
     testToSql("create table p (a int, b int) partitioned by (day string) sort by (a ,b) ",
         "default",
         "CREATE TABLE default.p ( a INT, b INT ) PARTITIONED BY ( day STRING ) " +
-        "SORT BY ( a, b ) STORED AS TEXTFILE" , true);
+        "SORT BY LEXICAL ( a, b ) STORED AS TEXTFILE" , true);
+
+    // Table with SORT BY LEXICAL clause.
+    testToSql("create table p (a int) partitioned by (day string) sort by lexical (a) " +
+        "comment 'This is a test'",
+        "default",
+        "CREATE TABLE default.p ( a INT ) PARTITIONED BY ( day STRING ) " +
+        "SORT BY LEXICAL ( a ) COMMENT 'This is a test' STORED AS TEXTFILE" , true);
+    testToSql("create table p (a int, b int) partitioned by (day string) sort by " +
+        "lexical (a, b)",
+        "default",
+        "CREATE TABLE default.p ( a INT, b INT ) PARTITIONED BY ( day STRING ) " +
+        "SORT BY LEXICAL ( a, b ) STORED AS TEXTFILE" , true);
+
+    // Table with SORT BY ZORDER clause.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    testToSql("create table p (a int, b int) partitioned by (day string) sort by zorder" +
+        "(a ,b) ", "default",
+        "CREATE TABLE default.p ( a INT, b INT ) PARTITIONED BY ( day STRING ) " +
+        "SORT BY ZORDER ( a, b ) STORED AS TEXTFILE" , true);
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+
     // Kudu table with a TIMESTAMP column default value
     String kuduMasters = catalog_.getDefaultKuduMasterHosts();
     testToSql(String.format("create table p (a bigint primary key, " +
@@ -360,9 +384,24 @@ public class ToSqlTest extends FrontendTestBase {
     // Table with SORT BY clause.
     testToSql("create table p partitioned by (int_col) sort by (string_col) as " +
         "select double_col, string_col, int_col from functional.alltypes", "default",
-        "CREATE TABLE default.p PARTITIONED BY ( int_col ) SORT BY ( string_col ) " +
-        "STORED AS TEXTFILE AS SELECT double_col, string_col, int_col FROM " +
-        "functional.alltypes", true);
+        "CREATE TABLE default.p PARTITIONED BY ( int_col ) SORT BY LEXICAL " +
+        "( string_col ) STORED AS TEXTFILE AS SELECT double_col, string_col, int_col " +
+        "FROM functional.alltypes", true);
+    // Table with SORT BY LEXICAL clause.
+    testToSql("create table p partitioned by (int_col) sort by lexical (string_col) as " +
+        "select double_col, string_col, int_col from functional.alltypes", "default",
+        "CREATE TABLE default.p PARTITIONED BY ( int_col ) SORT BY LEXICAL " +
+        "( string_col ) STORED AS TEXTFILE AS SELECT double_col, string_col, int_col " +
+        "FROM functional.alltypes", true);
+    // Table with SORT BY ZORDER clause.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+    testToSql("create table p partitioned by (string_col) sort by zorder (int_col, " +
+        "bool_col) as select int_col, bool_col, string_col from functional.alltypes",
+        "default",
+        "CREATE TABLE default.p PARTITIONED BY ( string_col ) SORT BY ZORDER " +
+        "( int_col, bool_col ) STORED AS TEXTFILE AS SELECT " +
+        "int_col, bool_col, string_col FROM functional.alltypes", true);
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
     // Kudu table with multiple partition params
     String kuduMasters = catalog_.getDefaultKuduMasterHosts();
     testToSql(String.format("create table p primary key (a,b) " +
@@ -383,7 +422,16 @@ public class ToSqlTest extends FrontendTestBase {
         "CREATE TABLE p LIKE functional.alltypes");
     // Table with sort columns.
     testToSql("create table p sort by (id) like functional.alltypes", "default",
-        "CREATE TABLE p SORT BY (id) LIKE functional.alltypes");
+        "CREATE TABLE p SORT BY LEXICAL (id) LIKE functional.alltypes");
+    // Table with LEXICAL sort columns.
+    testToSql("create table p sort by LEXICAL (id) like functional.alltypes", "default",
+        "CREATE TABLE p SORT BY LEXICAL (id) LIKE functional.alltypes");
+    // Table with ZORDER sort columns.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+    testToSql("create table p sort by zorder (bool_col, int_col) like " +
+        "functional.alltypes",  "default",
+        "CREATE TABLE p SORT BY ZORDER (bool_col,int_col) LIKE functional.alltypes");
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 
   @Test
@@ -398,7 +446,22 @@ public class ToSqlTest extends FrontendTestBase {
         "'/test-warehouse/schemas/alltypestiny.parquet' sort by (int_col, id)", "default",
         "CREATE TABLE IF NOT EXISTS default.p LIKE PARQUET " +
         "'hdfs://localhost:20500/test-warehouse/schemas/alltypestiny.parquet' " +
-        "SORT BY ( int_col, id ) STORED AS TEXTFILE", true);
+        "SORT BY LEXICAL ( int_col, id ) STORED AS TEXTFILE", true);
+    // Table with sort LEXICAL columns.
+    testToSql("create table if not exists p like parquet " +
+        "'/test-warehouse/schemas/alltypestiny.parquet' sort by lexical (int_col, id)",
+        "default",
+        "CREATE TABLE IF NOT EXISTS default.p LIKE PARQUET " +
+        "'hdfs://localhost:20500/test-warehouse/schemas/alltypestiny.parquet' " +
+        "SORT BY LEXICAL ( int_col, id ) STORED AS TEXTFILE", true);
+    // Table with ZORDER sort columns.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+    testToSql("create table if not exists p like parquet " +
+        "'/test-warehouse/schemas/alltypestiny.parquet' sort by zorder (int_col, id)",
+        "default", "CREATE TABLE IF NOT EXISTS default.p LIKE PARQUET " +
+        "'hdfs://localhost:20500/test-warehouse/schemas/alltypestiny.parquet' " +
+        "SORT BY ZORDER ( int_col, id ) STORED AS TEXTFILE", true);
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlUtilsTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlUtilsTest.java
index e33ee01..936eed7 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlUtilsTest.java
@@ -36,7 +36,9 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FrontendTestBase;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TFunctionBinaryType;
+import org.apache.impala.thrift.TSortingOrder;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -133,6 +135,26 @@ public class ToSqlUtilsTest extends FrontendTestBase {
     // quoted column with a comma in the name: `foo,bar`.
   }
 
+  @Test
+  public void testSortOrder() {
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    Map<String,String> props = new HashMap<>();
+    props.put("foo", "foo-value");
+    // Sorting order is LEXICAL by default
+    assertEquals("LEXICAL", ToSqlUtils.getSortingOrder(props));
+
+    props.put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER, "ZORDER");
+    // Returns true if zorder property set ZORDER
+    assertEquals("ZORDER", ToSqlUtils.getSortingOrder(props));
+
+    props.put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER, "LEXICAL");
+    // Returns false if zorder property set to LEXICAL
+    assertEquals("LEXICAL", ToSqlUtils.getSortingOrder(props));
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+  }
+
   private FeTable getTable(String dbName, String tableName) {
     FeTable table = catalog_.getOrLoadTable(dbName, tableName);
     assertNotNull(table);
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index f4980d0..8bcfbd2 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.authorization;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -27,6 +28,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TPrivilegeLevel;
@@ -2054,6 +2056,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
 
   @Test
   public void testAlterTable() throws ImpalaException {
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
     for (AuthzTest test: new AuthzTest[]{
         authorize("alter table functional.alltypes add column c1 int"),
         authorize("alter table functional.alltypes add columns(c1 int)"),
@@ -2068,6 +2071,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
         authorize("alter table functional.alltypes partition(year=2009) set cached " +
             "in 'testPool'"),
         authorize("alter table functional.alltypes sort by (id)"),
+        authorize("alter table functional.alltypes sort by zorder (id, bool_col)"),
         authorize("alter table functional.alltypes set column stats int_col " +
             "('numNulls'='1')"),
         authorize("alter table functional.alltypes recover partitions"),
@@ -2096,6 +2100,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
               allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
               TPrivilegeLevel.ALTER)));
     }
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
 
     try {
       // We cannot set an owner to a role that doesn't exist
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 7cabb86..4873caf 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -30,6 +30,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.datagenerator.HBaseTestDataRegionAssignment;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.Frontend.PlanCtx;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.testutil.TestUtils.IgnoreValueFilter;
@@ -285,6 +286,23 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testInsertSortByZorder() {
+    // Add a test table with a SORT BY ZORDER clause to test that the corresponding sort
+    // nodes are added by the insert statements in insert-sort-by.test.
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(true);
+
+    addTestDb("test_sort_by_zorder", "Test DB for SORT BY ZORDER clause.");
+    addTestTable("create table test_sort_by_zorder.t (id int, int_col int, " +
+        "bool_col boolean) partitioned by (year int, month int) " +
+        "sort by zorder (int_col, bool_col) location '/'");
+    addTestTable("create table test_sort_by_zorder.t_nopart (id int, int_col int, " +
+        "bool_col boolean) sort by zorder (int_col, bool_col) location '/'");
+    runPlannerTestFile("insert-sort-by-zorder", "test_sort_by_zorder");
+
+    BackendConfig.INSTANCE.setZOrderSortUnlocked(false);
+  }
+
+  @Test
   public void testHdfs() {
     runPlannerTestFile("hdfs");
   }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
new file mode 100644
index 0000000..25fdc82
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
@@ -0,0 +1,331 @@
+# IMPALA-4166: insert into tables with sort.columns property adds sort node. Clustering
+# columns are added to the sort columns.
+insert into table test_sort_by_zorder.t partition(year, month) /*+ shuffle */
+select id, int_col, bool_col, year, month from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+====
+# IMPALA-4166: insert with noshuffle hint into tables with sort.columns property adds
+# sort node.
+insert into table test_sort_by_zorder.t partition(year, month) /*+ noshuffle */
+select id, int_col, bool_col, year, month from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+====
+# IMPALA-4166: insert into tables with sort.columns property adds sort node. Clustering
+# columns are added to the sort columns. noclustered hint is ignored when sort.columns
+# are specified.
+insert into table test_sort_by_zorder.t partition(year, month) /*+ noclustered */
+select id, int_col, bool_col, year, month from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=7.30K
+|
+01:EXCHANGE [HASH(`year`,`month`)]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+====
+# IMPALA-4166: insert into tables with sort.columns property adds sort node.
+insert into table test_sort_by_zorder.t_nopart /*+ shuffle */
+select id, int_col, bool_col from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+01:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+02:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+====
+# IMPALA-4166: insert with noshuffle hint into tables with sort.columns property adds
+# sort node.
+insert into table test_sort_by_zorder.t_nopart /*+ noshuffle */
+select id, int_col, bool_col from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+01:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+01:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+====
+# IMPALA-4166: sort columns are correct when using an identity column permutation.
+insert into table test_sort_by_zorder.t_nopart (id, int_col, bool_col) /*+ shuffle */
+select id, int_col, bool_col from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+01:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+02:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+====
+# IMPALA-4166: sort columns are correct when using a non-trivial column permutation.
+insert into table test_sort_by_zorder.t_nopart (bool_col, id, int_col) /*+ shuffle */
+select bool_col, id, int_col from functional.alltypes
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+01:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t_nopart, OVERWRITE=false]
+|  partitions=1
+|
+02:SORT
+|  order by: ZORDER: int_col, bool_col
+|  row-size=9B cardinality=7.30K
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=9B cardinality=7.30K
+====
+# IMPALA-4166: sort columns with a join
+insert into table test_sort_by_zorder.t partition(year, month) /*+ noclustered */
+select a.id, b.int_col, a.bool_col, b.year, a.month
+from functional.alltypes a join functional.alltypes b on a.id = b.id order by b.string_col
+limit 10
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+04:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=10
+|
+03:TOP-N [LIMIT=10]
+|  order by: string_col ASC
+|  row-size=30B cardinality=10
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=34B cardinality=7.30K
+|
+|--00:SCAN HDFS [functional.alltypes a]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=9B cardinality=7.30K
+|
+01:SCAN HDFS [functional.alltypes b]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.id
+   row-size=25B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+07:SORT
+|  order by: ZORDER: year, month, int_col, bool_col
+|  row-size=17B cardinality=10
+|
+06:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: string_col ASC
+|  limit: 10
+|
+03:TOP-N [LIMIT=10]
+|  order by: string_col ASC
+|  row-size=30B cardinality=10
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: b.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=34B cardinality=7.30K
+|
+|--05:EXCHANGE [HASH(a.id)]
+|  |
+|  00:SCAN HDFS [functional.alltypes a]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=9B cardinality=7.30K
+|
+04:EXCHANGE [HASH(b.id)]
+|
+01:SCAN HDFS [functional.alltypes b]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.id
+   row-size=25B cardinality=7.30K
+====
+# IMPALA-4166: sort columns with a join and agg
+insert into table test_sort_by_zorder.t partition(year, month) /*+ noclustered */
+select a.id, max(b.int_col), min(a.bool_col), b.year, a.month
+from functional.alltypes a join functional.alltypes b on a.id = b.id
+group by a.id, b.year, a.month
+---- PLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`,a.`month`)]
+|  partitions=24
+|
+04:SORT
+|  order by: ZORDER: b.`year`, a.`month`, max(b.int_col), min(a.bool_col)
+|  row-size=17B cardinality=7.30K
+|
+03:AGGREGATE [FINALIZE]
+|  output: max(b.int_col), min(a.bool_col)
+|  group by: a.id, b.`year`, a.`month`
+|  row-size=17B cardinality=7.30K
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: b.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=21B cardinality=7.30K
+|
+|--00:SCAN HDFS [functional.alltypes a]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=9B cardinality=7.30K
+|
+01:SCAN HDFS [functional.alltypes b]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.id
+   row-size=12B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`,a.`month`)]
+|  partitions=24
+|
+09:SORT
+|  order by: ZORDER: b.`year`, a.`month`, max(b.int_col), min(a.bool_col)
+|  row-size=17B cardinality=7.30K
+|
+08:EXCHANGE [HASH(b.`year`,a.`month`)]
+|
+07:AGGREGATE [FINALIZE]
+|  output: max:merge(b.int_col), min:merge(a.bool_col)
+|  group by: a.id, b.`year`, a.`month`
+|  row-size=17B cardinality=7.30K
+|
+06:EXCHANGE [HASH(a.id,b.`year`,a.`month`)]
+|
+03:AGGREGATE [STREAMING]
+|  output: max(b.int_col), min(a.bool_col)
+|  group by: a.id, b.`year`, a.`month`
+|  row-size=17B cardinality=7.30K
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: b.id = a.id
+|  runtime filters: RF000 <- a.id
+|  row-size=21B cardinality=7.30K
+|
+|--05:EXCHANGE [HASH(a.id)]
+|  |
+|  00:SCAN HDFS [functional.alltypes a]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=9B cardinality=7.30K
+|
+04:EXCHANGE [HASH(b.id)]
+|
+01:SCAN HDFS [functional.alltypes b]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> b.id
+   row-size=12B cardinality=7.30K
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/alter-table-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/alter-table-zorder.test
new file mode 100644
index 0000000..cdd18bb
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/alter-table-zorder.test
@@ -0,0 +1,244 @@
+====
+---- QUERY
+create table insert_data_z (i int, d int, f int, b boolean);
+insert into insert_data_z values (1, 2, 3, false), (4, 5, 6, true);
+====
+---- QUERY
+create table insert_zsorted (i int, d int, f int, b boolean);
+====
+---- QUERY
+# Test setting the sort.columns and sort.zorder property
+alter table insert_zsorted sort by zorder(i, d);
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','i,d                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted select i, d, f, b from insert_data_z;
+---- RUNTIME_PROFILE
+row_regex: .*order by: ZORDER: i, d
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted;
+---- RESULTS
+2
+====
+---- QUERY
+# Test altering the sort.columns andd sort.order property
+alter table insert_zsorted sort by zorder(b, d, f);
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','b,d,f               '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted select i, d, f, b from insert_data_z;
+---- RUNTIME_PROFILE
+row_regex: .*order by: ZORDER: b, d, f
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted;
+---- RESULTS
+4
+====
+---- QUERY
+# Test renaming a column in the sort by zorder list.
+alter table insert_zsorted change d e int;
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','b,e,f               '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted select i, d, f, b from insert_data_z;
+---- RUNTIME_PROFILE
+row_regex: .*order by: ZORDER: b, d, f
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted;
+---- RESULTS
+6
+====
+---- QUERY
+# Test replacing the column list, including a column in the sort by zorder list.
+alter table insert_zsorted replace columns (i bigint, e decimal(12,2), f boolean);
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','e,f                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted select i, cast(d as decimal(12,2)), b from insert_data_z;
+---- RUNTIME_PROFILE
+row_regex: .*order by: ZORDER: CAST\(d AS DECIMAL\(12,2\)\), b
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted;
+---- RESULTS
+8
+====
+---- QUERY
+# Test dropping a column in the sort by list
+alter table insert_zsorted drop column f;
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','e                   '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test that a dropped column cannot be added as a sort column
+alter table insert_zsorted sort by zorder(f);
+---- CATCH
+AnalysisException: Could not find SORT BY column 'f' in table.
+====
+---- QUERY
+# Test that erroneous query didn't change sort columns
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','e                   '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test removing the sort.columns property
+alter table insert_zsorted sort by zorder ();
+describe formatted insert_zsorted;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','                    '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted select i, cast(d as decimal(12,2)) from insert_data_z;
+---- RUNTIME_PROFILE
+aggregation(SUM, InitialRunsCreated): 0
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted;
+---- RESULTS
+10
+====
+---- QUERY
+create table insert_zsorted_partitioned (i int, d int, f int, b boolean) partitioned by (p int) sort by zorder (i, d);
+====
+---- QUERY
+# Test removing all sort columns.
+alter table insert_zsorted_partitioned sort by zorder();
+describe formatted insert_zsorted_partitioned;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','sort.columns        ','i,d                 '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test inserting after alter table
+insert into table insert_zsorted_partitioned partition (p=1) select i, d, f, b from insert_data_z;
+---- RUNTIME_PROFILE
+aggregation(SUM, InitialRunsCreated): 0
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted_partitioned;
+---- RESULTS
+2
+====
+---- QUERY
+# Re-add a sort column.
+alter table insert_zsorted_partitioned sort by zorder(d, i);
+describe formatted insert_zsorted_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','d,i                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+insert into table insert_zsorted_partitioned partition (p=1) select i, d, f, b from insert_data_z;
+---- RUNTIME_PROFILE
+row_regex: .*order by: ZORDER: d, i
+====
+---- QUERY
+# Test selection after alter table
+select count(*) from insert_zsorted_partitioned;
+---- RESULTS
+4
+====
+---- QUERY
+describe formatted insert_zsorted_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','d,i                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test dropping one sort column, so that one remains.
+alter table insert_zsorted_partitioned drop column i;
+describe formatted insert_zsorted_partitioned;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','sort.columns        ','d,i                 '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Creating table and swapping sorting orders
+create table swapping_table (i int, j int, k int) sort by zorder (i, j);
+describe formatted swapping_table;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','i,j                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Creating table and swapping sorting orders
+alter table swapping_table sort by lexical (i);
+describe formatted swapping_table;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','i                   '
+'','sort.order          ','LEXICAL             '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Creating table and swapping sorting orders
+alter table swapping_table sort by zorder (i, j, k);
+describe formatted swapping_table;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','i,j,k               '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Creating table and swapping sorting orders
+alter table swapping_table sort by ();
+describe formatted swapping_table;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','                    '
+'','sort.order          ','LEXICAL             '
+---- TYPES
+STRING,STRING,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table-as-select-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/create-table-as-select-zorder.test
new file mode 100644
index 0000000..b3a429a
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table-as-select-zorder.test
@@ -0,0 +1,12 @@
+====
+---- QUERY
+# Test adding sort.columns and sort.zorder when creating a table from a select query.
+create table zsortbytest sort by zorder (int_col, bool_col) as
+select * from functional.alltypessmall;
+describe formatted zsortbytest;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','int_col,bool_col    '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-zorder.test
new file mode 100644
index 0000000..d9d04c1
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-zorder.test
@@ -0,0 +1,13 @@
+====
+---- QUERY
+# Test adding sort.columns and sort.zorder when creating a table like a parquet file.
+create table $DATABASE.decimal_file like parquet
+'$FILESYSTEM_PREFIX/test-warehouse/schemas/decimal.parquet'
+sort by zorder (d32, d11) stored as textfile;
+describe formatted $DATABASE.decimal_file;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','d32,d11             '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table-like-table-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-table-zorder.test
new file mode 100644
index 0000000..4c2e014
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-table-zorder.test
@@ -0,0 +1,31 @@
+====
+---- QUERY
+# Test setting zsort.columns and sort.zorder when using create table like.
+create table zsortbytest sort by zorder (int_col, bool_col) like functional.alltypes;
+describe formatted zsortbytest;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','int_col,bool_col    '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test that sort.columns and sort.ordering will be inherited from the source table.
+create table zsortbytest_clone like zsortbytest;
+describe formatted zsortbytest_clone;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','int_col,bool_col    '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+# Test that sort.columns and sort.zorder can be overridden in the query.
+create table zsortbytest_override sort by zorder (id, int_col) like zsortbytest;
+describe formatted zsortbytest_override;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','id,int_col          '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/create-table-zorder.test
new file mode 100644
index 0000000..b1218e5
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table-zorder.test
@@ -0,0 +1,12 @@
+====
+---- QUERY
+# Make sure that specifying sort by zorder columns sets the 'sort.columns'
+# and 'sort.order' properties correctly.
+create table zsortbytest (i int, d int, b boolean) sort by zorder (d, i);
+describe formatted zsortbytest;
+---- RESULTS: VERIFY_IS_SUBSET
+'','sort.columns        ','d,i                 '
+'','sort.order          ','ZORDER              '
+---- TYPES
+STRING,STRING,STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table-zorder.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table-zorder.test
new file mode 100644
index 0000000..2b47d01
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table-zorder.test
@@ -0,0 +1,25 @@
+====
+---- CREATE_TABLE
+# Simple table with zsort columns.
+CREATE TABLE test1 (id INT, z INT)
+SORT BY ZORDER (id, z)
+STORED AS TEXTFILE
+---- RESULTS
+CREATE TABLE show_create_table_test_db.test1 (id INT, z INT)
+SORT BY ZORDER (id, z)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+====
+---- CREATE_TABLE
+# Simple partitioned table with zsort columns.
+CREATE TABLE test1 (id INT, z INT)
+PARTITIONED BY (x INT, y INT)
+SORT BY ZORDER (id, z)
+STORED AS TEXTFILE
+---- RESULTS
+CREATE TABLE show_create_table_test_db.test1 (id INT, z INT)
+PARTITIONED BY (x INT, y INT)
+SORT BY ZORDER (id, z)
+STORED AS TEXTFILE
+LOCATION '$$location_uri$$'
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 3c48957..a0efa71 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -355,7 +355,7 @@ SORT BY (id)
 STORED AS TEXTFILE
 ---- RESULTS
 CREATE TABLE show_create_table_test_db.test1 (id INT)
-SORT BY (id)
+SORT BY LEXICAL (id)
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
 ====
@@ -368,7 +368,7 @@ STORED AS TEXTFILE
 ---- RESULTS
 CREATE TABLE show_create_table_test_db.test1 (id INT)
 PARTITIONED BY (x INT, y INT)
-SORT BY (id)
+SORT BY LEXICAL (id)
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
 ====
diff --git a/tests/custom_cluster/test_zorder.py b/tests/custom_cluster/test_zorder.py
new file mode 100644
index 0000000..bb9ec12
--- /dev/null
+++ b/tests/custom_cluster/test_zorder.py
@@ -0,0 +1,302 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import shlex
+import pprint
+import re
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.parametrize import UniqueDatabase
+from tests.common.skip import SkipIfLocal
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.util.test_file_parser import QueryTestSectionReader, remove_comments
+
+
+# Temporary classes for testing Z-ordering frontend features.
+# These classes are based on tests/metadata/test_ddl.py and
+# tests/metadata/test_show_create_table.py.
+# TODO: merge the testfiles and delete this test when Z-order tests do not
+# require custom cluster anymore.
+class TestZOrder(CustomClusterTestSuite):
+  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS"]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestZOrder, cls).add_test_dimensions()
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  @SkipIfLocal.hdfs_client
+  @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2)
+  def test_alter_table(self, vector, unique_database):
+    vector.get_value('exec_option')['abort_on_error'] = False
+
+    # Create an unpartitioned table to get a filesystem directory that does not
+    # use the (key=value) format. The directory is automatically cleanup up
+    # by the unique_database fixture.
+    self.client.execute("create table {0}.part_data (i int)".format(unique_database))
+    assert self.filesystem_client.exists(
+        "test-warehouse/{0}.db/part_data".format(unique_database))
+    self.filesystem_client.create_file(
+        "test-warehouse/{0}.db/part_data/data.txt".format(unique_database),
+        file_data='1984')
+    self.run_test_case('QueryTest/alter-table-zorder', vector, use_db=unique_database)
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table(self, vector, unique_database):
+    vector.get_value('exec_option')['abort_on_error'] = False
+    self.run_test_case('QueryTest/create-table-zorder', vector, use_db=unique_database)
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_like_table(self, vector, unique_database):
+    vector.get_value('exec_option')['abort_on_error'] = False
+    self.run_test_case('QueryTest/create-table-like-table-zorder', vector,
+        use_db=unique_database)
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_like_file(self, vector, unique_database):
+    vector.get_value('exec_option')['abort_on_error'] = False
+    self.run_test_case('QueryTest/create-table-like-file-zorder', vector,
+        use_db=unique_database)
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_create_table_as_select(self, vector, unique_database):
+    vector.get_value('exec_option')['abort_on_error'] = False
+    self.run_test_case('QueryTest/create-table-as-select-zorder', vector,
+        use_db=unique_database)
+
+
+# The purpose of the show create table tests are to ensure that the "SHOW CREATE TABLE"
+# output can actually be used to recreate the table. A test consists of a table
+# definition. The table is created, then the output of "SHOW CREATE TABLE" is used to
+# test if the table can be recreated. This test class does not support --update-results.
+class TestZOrderShowCreateTable(CustomClusterTestSuite):
+  VALID_SECTION_NAMES = ["CREATE_TABLE", "CREATE_VIEW", "QUERY", "RESULTS"]
+  # Properties to filter before comparing results
+  FILTER_TBL_PROPERTIES = ["transient_lastDdlTime", "numFiles", "numPartitions",
+                           "numRows", "rawDataSize", "totalSize", "COLUMN_STATS_ACCURATE",
+                           "STATS_GENERATED_VIA_STATS_TASK", "last_modified_by",
+                           "last_modified_time", "numFilesErasureCoded",
+                           "bucketing_version"]
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestZOrderShowCreateTable, cls).add_test_dimensions()
+    # There is no reason to run these tests using all dimensions.
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'text' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  @CustomClusterTestSuite.with_args(impalad_args="--unlock_zorder_sort=true")
+  def test_show_create_table(self, vector, unique_database):
+    self.__run_show_create_table_test_case('QueryTest/show-create-table-zorder', vector,
+                                           unique_database)
+
+  def __run_show_create_table_test_case(self, test_file_name, vector, unique_db_name):
+    """
+    Runs a show-create-table test file, containing the following sections:
+
+    ---- CREATE_TABLE
+    contains a table creation statement to create table TABLE_NAME
+    ---- RESULTS
+    contains the expected result of SHOW CREATE TABLE table_name
+
+    OR
+
+    ---- CREATE_VIEW
+    contains a view creation statement to create table VIEW_NAME
+    ---- RESULTS
+    contains the expected result of SHOW CREATE VIEW table_name
+
+    OR
+
+    ---- QUERY
+    a show create table query
+    ---- RESULTS
+    contains the expected output of the SHOW CREATE TABLE query
+
+    unique_db_name is the name of the database to use for all tables and
+    views and must be unique so as not to conflict with other tests.
+    """
+    sections = self.load_query_test_file(self.get_workload(), test_file_name,
+                                         self.VALID_SECTION_NAMES)
+    for test_section in sections:
+      test_case = ShowCreateTableTestCase(test_section, test_file_name, unique_db_name)
+
+      if not test_case.existing_table:
+        # create table in Impala
+        self.__exec(test_case.create_table_sql)
+      # execute "SHOW CREATE TABLE ..."
+      result = self.__exec(test_case.show_create_table_sql)
+      create_table_result = self.__normalize(result.data[0])
+
+      if not test_case.existing_table:
+        # drop the table
+        self.__exec(test_case.drop_table_sql)
+
+      # check the result matches the expected result
+      expected_result = self.__normalize(self.__replace_uri(
+          test_case.expected_result,
+          self.__get_location_uri(create_table_result)))
+      self.__compare_result(expected_result, create_table_result)
+
+      if test_case.existing_table:
+        continue
+
+      # recreate the table with the result from above
+      self.__exec(create_table_result)
+      try:
+        # we should get the same result from "show create table ..."
+        result = self.__exec(test_case.show_create_table_sql)
+        new_create_table_result = self.__normalize(result.data[0])
+        assert create_table_result == new_create_table_result
+      finally:
+        # drop the table
+        self.__exec(test_case.drop_table_sql)
+
+  def __exec(self, sql_str):
+    return self.execute_query_expect_success(self.client, sql_str)
+
+  def __get_location_uri(self, sql_str):
+    m = re.search("LOCATION '([^\']+)'", sql_str)
+    if m is not None:
+      return m.group(1)
+
+  def __compare_result(self, expected_sql, actual_sql):
+    """ Extract all properties """
+    expected_tbl_props = self.__get_properties_map(expected_sql, "TBLPROPERTIES")
+    actual_tbl_props = self.__get_properties_map(actual_sql, "TBLPROPERTIES")
+    assert expected_tbl_props == actual_tbl_props
+
+    expected_serde_props = self.__get_properties_map(expected_sql, "SERDEPROPERTIES")
+    actual_serde_props = self.__get_properties_map(actual_sql, "SERDEPROPERTIES")
+    assert expected_serde_props == actual_serde_props
+
+    expected_sql_filtered = self.__remove_properties_maps(expected_sql)
+    actual_sql_filtered = self.__remove_properties_maps(actual_sql)
+    assert expected_sql_filtered == actual_sql_filtered
+
+  def __normalize(self, s):
+    """ Normalize the string to remove extra whitespaces and remove keys
+    from tblproperties and serdeproperties that we don't want
+    """
+    s = ' '.join(s.split())
+    for k in self.FILTER_TBL_PROPERTIES:
+      kv_regex = "'%s'\s*=\s*'[^\']+'\s*,?" % (k)
+      s = re.sub(kv_regex, "", s)
+    # If we removed the last property, there will be a dangling comma that is not valid
+    # e.g. 'k1'='v1', ) -> 'k1'='v1')
+    s = re.sub(",\s*\)", ")", s)
+    # Need to remove any whitespace after left parens and before right parens
+    s = re.sub("\(\s+", "(", s)
+    s = re.sub("\s+\)", ")", s)
+    # If the only properties were removed, the properties sections may be empty, which
+    # is not valid
+    s = re.sub("TBLPROPERTIES\s*\(\s*\)", "", s)
+    s = re.sub("SERDEPROPERTIES\s*\(\s*\)", "", s)
+    return s
+
+  def __properties_map_regex(self, name):
+    return "%s \(([^)]+)\)" % name
+
+  def __remove_properties_maps(self, s):
+    """ Removes the tblproperties and serdeproperties from the string """
+    return re.sub(
+        self.__properties_map_regex("WITH SERDEPROPERTIES"), "",
+        re.sub(self.__properties_map_regex("TBLPROPERTIES"), "", s)).strip()
+
+  def __get_properties_map(self, s, properties_map_name):
+    """ Extracts a dict of key-value pairs from the sql string s. The properties_map_name
+    is the name of the properties map, e.g. 'tblproperties' or 'serdeproperties'
+    """
+    map_match = re.search(self.__properties_map_regex(properties_map_name), s)
+    if map_match is None:
+      return dict()
+    kv_regex = "'([^\']+)'\s*=\s*'([^\']+)'"
+    kv_results = dict(re.findall(kv_regex, map_match.group(1)))
+    for filtered_key in self.FILTER_TBL_PROPERTIES:
+      if filtered_key in kv_results:
+        del kv_results[filtered_key]
+    return kv_results
+
+  def __replace_uri(self, s, uri):
+    return s if uri is None else s.replace("$$location_uri$$", uri)
+
+
+# Represents one show-create-table test case. Performs validation of the test sections
+# and provides SQL to execute for each section.
+class ShowCreateTableTestCase(object):
+  RESULTS_DB_NAME_TOKEN = "show_create_table_test_db"
+
+  def __init__(self, test_section, test_file_name, test_db_name):
+    if 'QUERY' in test_section:
+      self.existing_table = True
+      self.show_create_table_sql = remove_comments(test_section['QUERY']).strip()
+    elif 'CREATE_TABLE' in test_section:
+      self.__process_create_section(
+          test_section['CREATE_TABLE'], test_file_name, test_db_name, 'table')
+    elif 'CREATE_VIEW' in test_section:
+      self.__process_create_section(
+          test_section['CREATE_VIEW'], test_file_name, test_db_name, 'view')
+    else:
+      assert 0, 'Error in test file %s. Test cases require a '\
+          'CREATE_TABLE section.\n%s' %\
+          (test_file_name, pprint.pformat(test_section))
+    expected_result = remove_comments(test_section['RESULTS'])
+    self.expected_result = expected_result.replace(
+        ShowCreateTableTestCase.RESULTS_DB_NAME_TOKEN, test_db_name)
+
+  def __process_create_section(self, section, test_file_name, test_db_name, table_type):
+    self.existing_table = False
+    self.create_table_sql = QueryTestSectionReader.build_query(remove_comments(section))
+    name = self.__get_table_name(self.create_table_sql, table_type)
+    assert name.find(".") == -1, 'Error in test file %s. Found unexpected %s '\
+        'name %s that is qualified with a database' % (table_type, test_file_name, name)
+    self.table_name = test_db_name + '.' + name
+    self.create_table_sql = self.create_table_sql.replace(name, self.table_name, 1)
+    self.show_create_table_sql = 'show create %s %s' % (table_type, self.table_name)
+    self.drop_table_sql = "drop %s %s" % (table_type, self.table_name)
+
+  def __get_table_name(self, create_table_sql, table_type):
+    lexer = shlex.shlex(create_table_sql)
+    tokens = list(lexer)
+    # sanity check the create table statement
+    if len(tokens) < 3 or tokens[0].lower() != "create":
+      assert 0, 'Error in test. Invalid CREATE TABLE statement: %s' % (create_table_sql)
+    if tokens[1].lower() != table_type.lower() and \
+       (tokens[1].lower() != "external" or tokens[2].lower() != table_type.lower()):
+      assert 0, 'Error in test. Invalid CREATE TABLE statement: %s' % (create_table_sql)
+
+    if tokens[1].lower() == "external":
+      # expect "create external table table_name ..."
+      return tokens[3]
+    else:
+      # expect a create table table_name ...
+      return tokens[2]