You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2018/02/04 18:46:29 UTC

[4/6] impala git commit: IMPALA-5293: Turn insert clustering on by default

IMPALA-5293: Turn insert clustering on by default

This change enables clustering by default. IMPALA-2521 introduced the
'clustered' hint which inserts a local sort by the partitioning columns
to a query plan. The hint is only effective for HDFS and Kudu tables.

Like before, the 'noclustered' hint prevents clustering. If a table has
ordering columns defined, the 'noclustered' hint is ignored and we
issue a warning.

This change removes some tests that were added specifically to test
that clustering can be enabled using the 'clustered' hint. It changes
some tests to use the 'noclustered' hint to make sure that clustering
can be disabled. It also adds tests to make sure that we cover the
'noclustered' case properly.

Cherry-picks: not for 2.x.

Change-Id: Idbf2368cf4415e6ecfa65058daf6ff87ef62f9d9
Reviewed-on: http://gerrit.cloudera.org:8080/9153
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fc529b7f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fc529b7f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fc529b7f

Branch: refs/heads/master
Commit: fc529b7f9fce9d1bc1f6da2952380bad3dea0b11
Parents: a493a01
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Jan 29 11:29:01 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 3 05:58:50 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/InsertStmt.java  |  14 ++-
 .../java/org/apache/impala/planner/Planner.java |   5 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |   1 +
 .../queries/PlannerTest/constant-folding.test   |   7 +-
 .../queries/PlannerTest/empty.test              |  15 ++-
 .../queries/PlannerTest/insert.test             | 121 ++++++++++++++++---
 .../queries/PlannerTest/kudu.test               |  15 +--
 .../queries/PlannerTest/order.test              |  25 ++++
 .../PlannerTest/resource-requirements.test      |  33 +++--
 .../queries/PlannerTest/with-clause.test        |  47 ++++++-
 .../queries/QueryTest/insert.test               |  28 ++---
 .../queries/QueryTest/stats-extrapolation.test  |   3 +-
 12 files changed, 248 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 462728a..1d361d6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -141,9 +141,8 @@ public class InsertStmt extends StatementBase {
   // Indicates whether this insert stmt has a clustered or noclustered hint. Only one of
   // them may be true, not both. If clustering is requested, we add a clustering phase
   // before the data sink, so that partitions can be written sequentially. The default
-  // behavior is to not perform an additional clustering step.
-  // TODO: hasClusteredHint_ can be removed once we enable clustering by default
-  // (IMPALA-5293).
+  // behavior is to not perform an additional clustering step. Both are required to detect
+  // conflicting hints.
   private boolean hasClusteredHint_ = false;
   private boolean hasNoClusteredHint_ = false;
 
@@ -877,6 +876,13 @@ public class InsertStmt extends StatementBase {
   public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
   public List<Expr> getSortExprs() { return sortExprs_; }
 
+  // Clustering is enabled by default. If the table has a 'sort.columns' property and the
+  // query has a 'noclustered' hint, we issue a warning during analysis and ignore the
+  // 'noclustered' hint.
+  public boolean requiresClustering() {
+    return !hasNoClusteredHint_ || !sortExprs_.isEmpty();
+  }
+
   public List<String> getMentionedColumns() {
     List<String> result = Lists.newArrayList();
     List<Column> columns = table_.getColumns();
@@ -888,7 +894,7 @@ public class InsertStmt extends StatementBase {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
-        partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_,
+        partitionKeyExprs_, mentionedColumns_, overwrite_, requiresClustering(),
         sortColumns_);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 71182f5..fdae919 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -622,10 +622,7 @@ public class Planner {
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
         partialSort = true;
       }
-    } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) {
-      // NOTE: If the table has a 'sort.columns' property and the query has a
-      // 'noclustered' hint, we issue a warning during analysis and ignore the
-      // 'noclustered' hint.
+    } else if (insertStmt.requiresClustering()) {
       orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
     }
     orderingExprs.addAll(insertStmt.getSortExprs());

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 65639b2..b4c2e43 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1897,6 +1897,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalyzesOk(String.format(
           "insert into table functional.alltypesnopart %sclustered%s " +
           "select * from functional.alltypesnopart", prefix, suffix));
+      // Test that noclustered is accepted.
       AnalyzesOk(String.format(
           "insert into table functional.alltypesnopart %snoclustered%s " +
           "select * from functional.alltypesnopart", prefix, suffix));

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 37b3163..59980b5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -341,11 +341,16 @@ select id, int_col, cast(1 + 1 + 1 + year as int), cast(month - (1 - 1 - 1) as i
 from functional.alltypessmall
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=38.00MB mem-reservation=6.00MB
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(CAST(3 + year AS INT),CAST(month - -1 AS INT))]
 |  partitions=4
 |  mem-estimate=1.56KB mem-reservation=0B
 |
+01:SORT
+|  order by: CAST(3 + year AS INT) ASC NULLS LAST, CAST(month - -1 AS INT) ASC NULLS LAST
+|  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=16B cardinality=100
+|
 00:SCAN HDFS [functional.alltypessmall]
    partitions=4/4 files=4 size=6.32KB
    stored statistics:

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
index c85d8ec..43d1fcf 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
@@ -302,9 +302,12 @@ PLAN-ROOT SINK
 insert into functional.alltypes partition(year, month)
 select * from functional.alltypes where 1 = 0
 ---- PLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:EMPTYSET
 ====
 # IMPALA-1860: INSERT/CTAS should evaluate and apply constant predicates.
@@ -312,9 +315,12 @@ with t as (select * from functional.alltypes where coalesce(NULL) > 10)
 insert into functional.alltypes partition(year, month)
 select * from t
 ---- PLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:EMPTYSET
 ====
 # IMPALA-1860: INSERT/CTAS should evaluate and apply constant predicates.
@@ -499,7 +505,7 @@ PLAN-ROOT SINK
 |  03:EMPTYSET
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=577.87MB
+   partitions=1/1 files=4 size=292.36MB
    predicates: c_custkey < 10
 ====
 # IMPALA-2539: Test empty union operands containing relative table refs.
@@ -533,7 +539,7 @@ PLAN-ROOT SINK
 |  04:UNNEST [c.c_orders o1]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=577.87MB
+   partitions=1/1 files=4 size=292.36MB
    predicates: c_custkey = 1
 ====
 # IMPALA-2215: Having clause without aggregation.
@@ -550,7 +556,6 @@ PLAN-ROOT SINK
 |
 00:EMPTYSET
 ====
----- QUERY
 # IMPALA-5812: Test that a cross join with a constant select that returns an empty result
 # set translates into an EMPTYSET in the final plan
 select count(*) from functional.alltypes x cross join (select 1 as j) y where j is null

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index 522b058..ea0ec2a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -77,6 +77,38 @@ where year=2009 and month>10
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=2/24 files=2 size=40.07KB
+---- SCANRANGELOCATIONS
+NODE 0:
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=11/091101.txt 0:20179
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=12/091201.txt 0:20853
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+01:EXCHANGE [HASH(year,month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=2/24 files=2 size=40.07KB
+====
+# IMPALA-5293: noclustered hint prevents adding sort node
+insert into table functional.alltypessmall
+partition (year, month) /* +noclustered */
+select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
+float_col, double_col, date_string_col, string_col, timestamp_col, year, month
+from functional.alltypes
+where year=2009 and month>10
+---- PLAN
+WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=2/24 files=2 size=40.07KB
 ---- SCANRANGELOCATIONS
@@ -104,12 +136,18 @@ where year=2009 and month>10
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_col,int_col)]
 |  partitions=unavailable
 |
+01:SORT
+|  order by: int_col ASC NULLS LAST, int_col ASC NULLS LAST
+|
 00:SCAN HDFS [functional_seq_snap.alltypes]
    partitions=2/24 files=2 size=11.34KB
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_col,int_col)]
 |  partitions=unavailable
 |
+02:SORT
+|  order by: int_col ASC NULLS LAST, int_col ASC NULLS LAST
+|
 01:EXCHANGE [HASH(int_col,int_col)]
 |
 00:SCAN HDFS [functional_seq_snap.alltypes]
@@ -129,6 +167,9 @@ group by year, month
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
 |  group by: year, month
@@ -143,6 +184,9 @@ NODE 0:
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+04:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 03:AGGREGATE [FINALIZE]
 |  output: min:merge(id), min:merge(bool_col), min:merge(tinyint_col), min:merge(smallint_col), min:merge(int_col), min:merge(bigint_col), min:merge(float_col), min:merge(double_col), min:merge(date_string_col), min:merge(string_col), min:merge(timestamp_col)
 |  group by: year, month
@@ -167,6 +211,9 @@ where year=2009 and month>10
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
 |  partitions=12
 |
+01:SORT
+|  order by: month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=2/24 files=2 size=40.07KB
 ---- SCANRANGELOCATIONS
@@ -177,6 +224,9 @@ NODE 0:
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
 |  partitions=12
 |
+02:SORT
+|  order by: month ASC NULLS LAST
+|
 01:EXCHANGE [HASH(month)]
 |
 00:SCAN HDFS [functional.alltypes]
@@ -196,6 +246,9 @@ group by month
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
 |  partitions=12
 |
+02:SORT
+|  order by: month ASC NULLS LAST
+|
 01:AGGREGATE [FINALIZE]
 |  output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
 |  group by: month
@@ -210,6 +263,9 @@ NODE 0:
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
 |  partitions=12
 |
+04:SORT
+|  order by: month ASC NULLS LAST
+|
 03:AGGREGATE [FINALIZE]
 |  output: min:merge(id), min:merge(bool_col), min:merge(tinyint_col), min:merge(smallint_col), min:merge(int_col), min:merge(bigint_col), min:merge(float_col), min:merge(double_col), min:merge(date_string_col), min:merge(string_col), min:merge(timestamp_col)
 |  group by: month
@@ -234,6 +290,9 @@ where year>2009 and month=4
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4)]
 |  partitions=2
 |
+01:SORT
+|  order by: year ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=1/24 files=1 size=19.71KB
 ---- SCANRANGELOCATIONS
@@ -243,6 +302,9 @@ NODE 0:
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4)]
 |  partitions=2
 |
+01:SORT
+|  order by: year ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=1/24 files=1 size=19.71KB
 ====
@@ -337,12 +399,18 @@ partition (year, month) values
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
 |  partitions=9
 |
+01:SORT
+|  order by: 2010 ASC NULLS LAST, 4 ASC NULLS LAST
+|
 00:UNION
    constant-operands=3
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
 |  partitions=9
 |
+01:SORT
+|  order by: 2010 ASC NULLS LAST, 4 ASC NULLS LAST
+|
 00:UNION
    constant-operands=3
 ====
@@ -429,9 +497,12 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
 insert into table functional.alltypes partition(year, month)
 select * from functional.alltypes
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
 00:SCAN HDFS [functional.alltypes]
@@ -441,9 +512,12 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.
 insert into table functional.alltypes partition(year, month) [noshuffle]
 select * from functional.alltypes
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -451,9 +525,12 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.
 insert into table functional.alltypes partition(year, month) /* +noshuffle */
 select * from functional.alltypes
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -461,9 +538,12 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.
 insert /* +noshuffle */ into table functional.alltypes partition(year, month)
 select * from functional.alltypes
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -472,9 +552,12 @@ insert into table functional.alltypes partition(year, month)
 -- +noshuffle
 select * from functional.alltypes
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=24
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -489,6 +572,9 @@ from functional.alltypes
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
 |  partitions=2
 |
+01:SORT
+|  order by: year ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
@@ -501,6 +587,9 @@ from functional.alltypes
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
 |  partitions=2
 |
+02:SORT
+|  order by: year ASC NULLS LAST
+|
 01:EXCHANGE [HASH(year)]
 |
 00:SCAN HDFS [functional.alltypes]
@@ -586,8 +675,8 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
-# IMPALA-2521: clustered insert into partitioned table adds sort node.
-insert into table functional.alltypes partition(year, month) /*+ clustered */
+# IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
+insert into table functional.alltypes partition(year, month)
 select * from functional.alltypes
 ---- PLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
@@ -610,8 +699,8 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
-# IMPALA-2521: clustered insert into partitioned table adds sort node.
-insert into table functional.alltypes partition(year, month) /*+ clustered,noshuffle */
+# IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
+insert into table functional.alltypes partition(year, month) /*+ noshuffle */
 select * from functional.alltypes
 ---- PLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
@@ -632,9 +721,9 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
-# IMPALA-2521: clustered insert into partitioned table adds sort node. Subquery in
-# WHERE-clause exercises the reset() + analyze() path during rewrite.
-insert into table functional.alltypes partition(year, month) /*+ clustered */
+# IMPALA-5293: ensure insert into partitioned table adds sort node without clustered hint.
+# Subquery in WHERE-clause exercises the reset() + analyze() path during rewrite.
+insert into table functional.alltypes partition(year, month)
 select * from functional.alltypes
 where int_col = (select max(int_col) from functional.alltypes)
 ---- PLAN
@@ -687,8 +776,8 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> int_col
 ====
-# IMPALA-2521: clustered insert into non-partitioned table does not add sort node.
-insert into table functional.alltypesnopart /*+ clustered */
+# IMPALA-5293: ensure insert into non-partitioned table does not add sort node.
+insert into table functional.alltypesnopart
 select * from functional.alltypesnopart
 ---- PLAN
 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
@@ -703,8 +792,8 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 00:SCAN HDFS [functional.alltypesnopart]
    partitions=1/1 files=0 size=0B
 ====
-# IMPALA-2521: clustered insert into non-partitioned table does not add sort node.
-insert into table functional.alltypesnopart /*+ clustered,shuffle */
+# IMPALA-5293: ensure insert into non-partitioned table does not add sort node.
+insert into table functional.alltypesnopart /*+ shuffle */
 select * from functional.alltypesnopart
 ---- PLAN
 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 835c41b..0e1b2c6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -473,7 +473,7 @@ INSERT INTO KUDU [tpch_kudu.nation]
 00:SCAN HDFS [tpch_parquet.nation]
    partitions=1/1 files=1 size=2.74KB
 ====
-# Unpartitioned table, still has a sort due to clustered hint.
+# Unpartitioned table, clustered hint forces sort node.
 insert into tpch_kudu.nation /* +clustered */
 select * from tpch_parquet.nation
 ---- DISTRIBUTEDPLAN
@@ -483,21 +483,16 @@ INSERT INTO KUDU [tpch_kudu.nation]
 |  order by: n_nationkey ASC NULLS LAST
 |
 00:SCAN HDFS [tpch_parquet.nation]
-   partitions=1/1 files=1 size=2.74KB
+   partitions=1/1 files=1 size=2.75KB
 ====
-# Unpartitioned table, both hints.
-insert into tpch_kudu.nation /* +shuffle,clustered */
+# Unpartitioned table, no sort node without clustered hint.
+insert into tpch_kudu.nation
 select * from tpch_parquet.nation
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [tpch_kudu.nation]
 |
-02:PARTIAL SORT
-|  order by: n_nationkey ASC NULLS LAST
-|
-01:EXCHANGE [UNPARTITIONED]
-|
 00:SCAN HDFS [tpch_parquet.nation]
-   partitions=1/1 files=1 size=2.74KB
+   partitions=1/1 files=1 size=2.75KB
 ====
 # Partition and primary key exprs are all constant, so don't partition/sort.
 insert into functional_kudu.alltypes (id)

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/order.test b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
index c64be15..cd60a3a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/order.test
@@ -930,6 +930,31 @@ PLAN-ROOT SINK
 insert into functional.alltypes partition(year, month)
 select * from functional.alltypes order by int_col
 ---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-5293: test that noclustered hint prevents sort node when order by is used without
+# limit in insert.
+insert into functional.alltypes partition(year, month) /* +noclustered */
+select * from functional.alltypes order by int_col
+---- PLAN
 WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
 |  partitions=24
 |

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 3c1fd44..e82d841 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2408,15 +2408,20 @@ create table dummy_insert
 partitioned by (l_partkey) as
 select l_comment, l_partkey from tpch.lineitem
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=0B
-Per-Host Resource Estimates: Memory=376.99MB
+Max Per-Host Resource Reservation: Memory=12.00MB
+Per-Host Resource Estimates: Memory=306.99MB
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=376.99MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=306.99MB mem-reservation=12.00MB
 WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)]
 |  partitions=200516
 |  mem-estimate=288.99MB mem-reservation=0B
 |
+01:SORT
+|  order by: l_partkey ASC NULLS LAST
+|  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=50B cardinality=6001215
+|
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
    stored statistics:
@@ -2426,15 +2431,20 @@ WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=0B
-Per-Host Resource Estimates: Memory=184.33MB
+Max Per-Host Resource Reservation: Memory=12.00MB
+Per-Host Resource Estimates: Memory=202.33MB
 
 F01:PLAN FRAGMENT [HASH(l_partkey)] hosts=3 instances=3
-|  Per-Host Resources: mem-estimate=96.33MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=114.33MB mem-reservation=12.00MB
 WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)]
 |  partitions=200516
 |  mem-estimate=96.33MB mem-reservation=0B
 |
+02:SORT
+|  order by: l_partkey ASC NULLS LAST
+|  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=50B cardinality=6001215
+|
 01:EXCHANGE [HASH(l_partkey)]
 |  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=0 row-size=50B cardinality=6001215
@@ -2450,15 +2460,20 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=0B
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=50B cardinality=6001215
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=0B
-Per-Host Resource Estimates: Memory=272.33MB
+Max Per-Host Resource Reservation: Memory=24.00MB
+Per-Host Resource Estimates: Memory=308.33MB
 
 F01:PLAN FRAGMENT [HASH(l_partkey)] hosts=3 instances=6
-|  Per-Host Resources: mem-estimate=96.33MB mem-reservation=0B
+|  Per-Host Resources: mem-estimate=132.33MB mem-reservation=24.00MB
 WRITE TO HDFS [default.dummy_insert, OVERWRITE=false, PARTITION-KEYS=(l_partkey)]
 |  partitions=200516
 |  mem-estimate=48.16MB mem-reservation=0B
 |
+02:SORT
+|  order by: l_partkey ASC NULLS LAST
+|  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB
+|  tuple-ids=1 row-size=50B cardinality=6001215
+|
 01:EXCHANGE [HASH(l_partkey)]
 |  mem-estimate=0B mem-reservation=0B
 |  tuple-ids=0 row-size=50B cardinality=6001215

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-planner/queries/PlannerTest/with-clause.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/with-clause.test b/testdata/workloads/functional-planner/queries/PlannerTest/with-clause.test
index 0a966c4..9c5f577 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/with-clause.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/with-clause.test
@@ -490,15 +490,21 @@ PLAN-ROOT SINK
 with t1 as (select * from functional.alltypestiny)
 insert into functional.alltypesinsert partition(year, month) select * from t1
 ---- PLAN
-WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(functional.alltypestiny.year,functional.alltypestiny.month)]
+WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=4
 |
+01:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 00:SCAN HDFS [functional.alltypestiny]
    partitions=4/4 files=4 size=460B
 ---- DISTRIBUTEDPLAN
-WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(functional.alltypestiny.year,functional.alltypestiny.month)]
+WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=4
 |
+02:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
 01:EXCHANGE [HASH(functional.alltypestiny.year,functional.alltypestiny.month)]
 |
 00:SCAN HDFS [functional.alltypestiny]
@@ -513,6 +519,43 @@ select * from t1 union all select * from t2
 WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(year,month)]
 |  partitions=16
 |
+03:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+00:UNION
+|
+|--02:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+01:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=16
+|
+04:SORT
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
+|
+03:EXCHANGE [HASH(year,month)]
+|
+00:UNION
+|
+|--02:SCAN HDFS [functional.alltypestiny]
+|     partitions=4/4 files=4 size=460B
+|
+01:SCAN HDFS [functional.alltypestiny]
+   partitions=4/4 files=4 size=460B
+====
+# IMPALA-5293: Test with clause in an insert statement and in its query statement. Make
+# sure that noclustered hint prevents addition of a sort node before writing to HDFS.
+with t1 as (select * from functional.alltypestiny)
+insert into functional.alltypesinsert partition(year, month) /* +noclustered */
+with t2 as (select * from functional.alltypestiny)
+select * from t1 union all select * from t2
+---- PLAN
+WRITE TO HDFS [functional.alltypesinsert, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=16
+|
 00:UNION
 |
 |--02:SCAN HDFS [functional.alltypestiny]

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-query/queries/QueryTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index 07a665a..f4c2ff2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -848,7 +848,7 @@ bigint
 # Check that hdfs writers respects mem_limit.
 set mem_limit=64m;
 insert into table alltypesinsert
-partition (year, month)
+partition (year, month) /* +noclustered */
 select at1.id, at1.bool_col, at1.tinyint_col, at1.smallint_col, at1.int_col, at1.bigint_col,
   at1.float_col, at1.double_col, at1.date_string_col, at1.string_col, at1.timestamp_col,
   at1.year, at2.id as month
@@ -859,9 +859,9 @@ DROP PARTITIONS alltypesinsert
 Memory limit exceeded
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table
+# IMPALA-5293: noclustered insert into table
 insert into table alltypesinsert
-partition (year, month) /*+ clustered,shuffle */
+partition (year, month) /*+ noclustered,shuffle */
 select * from alltypes;
 ---- SETUP
 DROP PARTITIONS alltypesinsert
@@ -893,9 +893,9 @@ year=2010/month=8/: 310
 year=2010/month=9/: 300
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table
+# IMPALA-5293: noclustered insert into table
 insert into table alltypesinsert
-partition (year, month) /*+ clustered,shuffle */
+partition (year, month) /*+ noclustered,shuffle */
 select * from alltypestiny;
 ---- SETUP
 DROP PARTITIONS alltypesinsert
@@ -907,9 +907,9 @@ year=2009/month=3/: 2
 year=2009/month=4/: 2
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table
+# IMPALA-5293: noclustered insert into table
 insert into table alltypesinsert
-partition (year, month) /*+ clustered,noshuffle */
+partition (year, month) /*+ noclustered,noshuffle */
 select * from alltypestiny;
 ---- SETUP
 DROP PARTITIONS alltypesinsert
@@ -921,9 +921,9 @@ year=2009/month=3/: 2
 year=2009/month=4/: 2
 ====
 ---- QUERY
-# IMPALA-2521: clustered insert into table
+# IMPALA-5293: noclustered insert into table
 insert into table alltypesinsert
-partition (year, month) /*+ clustered,shuffle */
+partition (year, month) /*+ noclustered,shuffle */
 select * from alltypestiny where int_col = 0;
 ---- SETUP
 DROP PARTITIONS alltypesinsert
@@ -935,9 +935,9 @@ year=2009/month=3/: 1
 year=2009/month=4/: 1
 ====
 ---- QUERY
-# IMPALA-2521: clustered, unpartitioned insert into table
+# IMPALA-5293: noclustered, unpartitioned insert into table
 insert into table alltypesnopart_insert
- /*+ clustered,shuffle */
+ /*+ noclustered,shuffle */
 select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
 double_col, date_string_col, string_col, timestamp_col from alltypessmall;
 ---- SETUP
@@ -946,13 +946,13 @@ RESET alltypesnopart_insert
 : 100
 ====
 ---- QUERY
-# IMPALA-6280: clustered with outer join, inline view, and TupleisNullPredicate
+# IMPALA-6280: clustered (default) with outer join, inline view, and TupleisNullPredicate
 insert into table alltypesinsert (int_col)
-partition (year, month) /*+ clustered,shuffle */
+partition (year, month) /*+ shuffle */
 select v.id, t1.id, t1.month from
 (select coalesce(id, 10) id from functional.alltypessmall) v
 right outer join functional.alltypestiny t1 on t1.id = v.id
 where v.id = 0
 ---- RESULTS
 year=0/month=1/: 1
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/fc529b7f/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index 814a605..b8081ee 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -1,6 +1,7 @@
 ====
 ---- QUERY
-create table alltypes like functional_parquet.alltypes;
+# This test relies on a deterministic row order so we use "sort by (id)".
+create table alltypes sort by (id) like functional_parquet.alltypes;
 insert into alltypes partition(year, month)
 select * from functional_parquet.alltypes where year = 2009;
 ====