You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/26 05:38:06 UTC

[4/4] incubator-impala git commit: IMPALA-2521: Add clustered hint to insert statements

IMPALA-2521: Add clustered hint to insert statements

This change introduces a clustered/noclustered hint for insert
statements. Specifying this hint adds an additional sort node to the
plan, just before the table sink. This has the effect that data will be
clustered by its partition prior to writing partitions, which therefore
can be written sequentially.

Change-Id: I412153bd8435d792bd61dea268d7a3b884048f14
Reviewed-on: http://gerrit.cloudera.org:8080/4745
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c24e9da9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c24e9da9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c24e9da9

Branch: refs/heads/master
Commit: c24e9da914e1d5e5dabd1bded5a78452bccff9b5
Parents: ba026f2
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Oct 11 01:27:14 2016 +0200
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 26 04:56:14 2016 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/InsertStmt.java  |  64 +++++++--
 .../org/apache/impala/analysis/QueryStmt.java   |  41 ++----
 .../org/apache/impala/analysis/SortInfo.java    |  63 ++++++++-
 .../org/apache/impala/catalog/KuduTable.java    |   4 +
 .../java/org/apache/impala/planner/Planner.java |  48 +++++++
 .../impala/analysis/AnalyzeStmtsTest.java       |   4 +-
 .../queries/PlannerTest/insert.test             | 135 +++++++++++++++++++
 .../queries/PlannerTest/kudu.test               |  58 ++++++++
 .../queries/QueryTest/insert.test               |  14 ++
 .../queries/QueryTest/kudu_crud.test            |  33 +++++
 10 files changed, 419 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index b80fcde..e95516e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -41,6 +41,7 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.planner.DataSink;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -104,22 +105,32 @@ public class InsertStmt extends StatementBase {
   // Set in analyze(). Contains metadata of target table to determine type of sink.
   private Table table_;
 
-  // Set in analyze(). Exprs corresponding to the partitionKeyValues,
+  // Set in analyze(). Exprs corresponding to the partitionKeyValues.
   private List<Expr> partitionKeyExprs_ = Lists.newArrayList();
 
   // Indicates whether this insert stmt has a shuffle or noshuffle plan hint.
   // Both flags may be false. Only one of them may be true, not both.
-  // Shuffle forces data repartitioning before then data sink, and noshuffle
+  // Shuffle forces data repartitioning before the data sink, and noshuffle
   // prevents it. Set in analyze() based on planHints_.
   private boolean hasShuffleHint_ = false;
   private boolean hasNoShuffleHint_ = false;
 
+  // Indicates whether this insert stmt has a clustered or noclustered hint. If clustering
+  // is requested, we add a clustering phase before the data sink, so that partitions can
+  // be written sequentially. The default behavior is to not perform an additional
+  // clustering step.
+  private boolean hasClusteredHint_ = false;
+
   // Output expressions that produce the final results to write to the target table. May
   // include casts, and NullLiterals where an output column isn't explicitly mentioned.
   // Set in prepareExpressions(). The i'th expr produces the i'th column of the target
   // table.
   private ArrayList<Expr> resultExprs_ = Lists.newArrayList();
 
+  // Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for
+  // non-Kudu tables.
+  private ArrayList<Expr> primaryKeyExprs_ = Lists.newArrayList();
+
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
@@ -170,7 +181,9 @@ public class InsertStmt extends StatementBase {
     partitionKeyExprs_.clear();
     hasShuffleHint_ = false;
     hasNoShuffleHint_ = false;
+    hasClusteredHint_ = false;
     resultExprs_.clear();
+    primaryKeyExprs_.clear();
   }
 
   @Override
@@ -493,7 +506,7 @@ public class InsertStmt extends StatementBase {
   }
 
   /**
-   * Performs three final parts of the analysis:
+   * Performs four final parts of the analysis:
    * 1. Checks type compatibility between all expressions and their targets
    *
    * 2. Populates partitionKeyExprs with type-compatible expressions, in Hive
@@ -503,6 +516,8 @@ public class InsertStmt extends StatementBase {
    * for all expressions in the select-list. Unmentioned columns are assigned NULL literal
    * expressions.
    *
+   * 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_.
+   *
    * If necessary, adds casts to the expressions to make them compatible with the type of
    * the corresponding column.
    *
@@ -585,7 +600,19 @@ public class InsertStmt extends StatementBase {
           resultExprs_.add(NullLiteral.create(tblColumn.getType()));
         }
       }
+      // Store exprs for Kudu key columns.
+      if (matchFound && table_ instanceof KuduTable) {
+        KuduTable kuduTable = (KuduTable) table_;
+        if (kuduTable.isPrimaryKeyColumn(tblColumn.getName())) {
+          primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
+        }
+      }
+    }
+
+    if (table_ instanceof KuduTable) {
+      Preconditions.checkState(!primaryKeyExprs_.isEmpty());
     }
+
     // TODO: Check that HBase row-key columns are not NULL? See IMPALA-406
     if (needsGeneratedQueryStatement_) {
       // Build a query statement that returns NULL for every column
@@ -603,28 +630,33 @@ public class InsertStmt extends StatementBase {
     if (planHints_ == null) return;
     if (!planHints_.isEmpty() && table_ instanceof HBaseTable) {
       throw new AnalysisException("INSERT hints are only supported for inserting into " +
-          "Hdfs tables.");
+          "Hdfs and Kudu tables.");
     }
+    boolean hasNoClusteredHint = false;
     for (String hint: planHints_) {
       if (hint.equalsIgnoreCase("SHUFFLE")) {
-        if (hasNoShuffleHint_) {
-          throw new AnalysisException("Conflicting INSERT hint: " + hint);
-        }
         hasShuffleHint_ = true;
         analyzer.setHasPlanHints();
       } else if (hint.equalsIgnoreCase("NOSHUFFLE")) {
-        if (hasShuffleHint_) {
-          throw new AnalysisException("Conflicting INSERT hint: " + hint);
-        }
         hasNoShuffleHint_ = true;
         analyzer.setHasPlanHints();
+      } else if (hint.equalsIgnoreCase("CLUSTERED")) {
+        hasClusteredHint_ = true;
+        analyzer.setHasPlanHints();
+      } else if (hint.equalsIgnoreCase("NOCLUSTERED")) {
+        hasNoClusteredHint = true;
+        analyzer.setHasPlanHints();
       } else {
         analyzer.addWarning("INSERT hint not recognized: " + hint);
       }
     }
     // Both flags may be false or one of them may be true, but not both.
-    Preconditions.checkState((!hasShuffleHint_ && !hasNoShuffleHint_)
-        || (hasShuffleHint_ ^ hasNoShuffleHint_));
+    if (hasShuffleHint_ && hasNoShuffleHint_) {
+      throw new AnalysisException("Conflicting INSERT hints: shuffle and noshuffle");
+    }
+    if (hasClusteredHint_ && hasNoClusteredHint) {
+      throw new AnalysisException("Conflicting INSERT hints: clustered and noclustered");
+    }
   }
 
   public List<String> getPlanHints() { return planHints_; }
@@ -641,7 +673,9 @@ public class InsertStmt extends StatementBase {
   public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
   public boolean hasShuffleHint() { return hasShuffleHint_; }
   public boolean hasNoShuffleHint() { return hasNoShuffleHint_; }
+  public boolean hasClusteredHint() { return hasClusteredHint_; }
   public ArrayList<Expr> getResultExprs() { return resultExprs_; }
+  public ArrayList<Expr> getPrimaryKeyExprs() { return primaryKeyExprs_; }
 
   public DataSink createDataSink() {
     // analyze() must have been called before.
@@ -651,12 +685,14 @@ public class InsertStmt extends StatementBase {
   }
 
   /**
-   * Substitutes the result expressions and the partition key expressions with smap.
-   * Preserves the original types of those expressions during the substitution.
+   * Substitutes the result expressions, the partition key expressions, and the primary
+   * key expressions with smap. Preserves the original types of those expressions during
+   * the substitution.
    */
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
     partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
+    primaryKeyExprs_ = Expr.substituteList(primaryKeyExprs_, smap, analyzer, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 392b961..464b5a5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -233,10 +233,8 @@ public abstract class QueryStmt extends StatementBase {
    * the order-by and result expressions. Those SlotRefs in the ordering and result exprs
    * are substituted with SlotRefs into the new tuple. This simplifies sorting logic for
    * total (no limit) sorts.
-   * Done after analyzeAggregation() since ordering and result exprs may refer to
-   * the outputs of aggregation. Invoked for UnionStmt as well since
-   * TODO: We could do something more sophisticated than simply copying input
-   * slotrefs - e.g. compute some order-by expressions.
+   * Done after analyzeAggregation() since ordering and result exprs may refer to the
+   * outputs of aggregation.
    */
   protected void createSortTupleInfo(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkState(evaluateOrderBy_);
@@ -249,35 +247,22 @@ public abstract class QueryStmt extends StatementBase {
       }
     }
 
-    // sourceSlots contains the slots from the input row to materialize.
-    Set<SlotRef> sourceSlots = Sets.newHashSet();
-    TreeNode.collect(resultExprs_, Predicates.instanceOf(SlotRef.class), sourceSlots);
-    TreeNode.collect(sortInfo_.getOrderingExprs(), Predicates.instanceOf(SlotRef.class),
-        sourceSlots);
-
-    TupleDescriptor sortTupleDesc = analyzer.getDescTbl().createTupleDescriptor("sort");
-    List<Expr> sortTupleExprs = Lists.newArrayList();
-    sortTupleDesc.setIsMaterialized(true);
-    // substOrderBy is the mapping from slot refs in the input row to slot refs in the
-    // materialized sort tuple.
-    ExprSubstitutionMap substOrderBy = new ExprSubstitutionMap();
-    for (SlotRef origSlotRef: sourceSlots) {
-      SlotDescriptor origSlotDesc = origSlotRef.getDesc();
-      SlotDescriptor materializedDesc =
-          analyzer.copySlotDescriptor(origSlotDesc, sortTupleDesc);
-      SlotRef cloneRef = new SlotRef(materializedDesc);
-      substOrderBy.put(origSlotRef, cloneRef);
+    ExprSubstitutionMap smap = sortInfo_.createSortTupleInfo(resultExprs_, analyzer);
+
+    for (int i = 0; i < smap.size(); ++i) {
+      Preconditions.checkState(smap.getLhs().get(i) instanceof SlotRef);
+      Preconditions.checkState(smap.getRhs().get(i) instanceof SlotRef);
+      SlotRef inputSlotRef = (SlotRef) smap.getLhs().get(i);
+      SlotRef outputSlotRef = (SlotRef) smap.getRhs().get(i);
       if (hasLimit()) {
-        analyzer.registerValueTransfer(origSlotRef.getSlotId(), cloneRef.getSlotId());
+        analyzer.registerValueTransfer(
+            inputSlotRef.getSlotId(), outputSlotRef.getSlotId());
       } else {
-        analyzer.createAuxEquivPredicate(cloneRef, origSlotRef);
+        analyzer.createAuxEquivPredicate(outputSlotRef, inputSlotRef);
       }
-      sortTupleExprs.add(origSlotRef);
     }
 
-    resultExprs_ = Expr.substituteList(resultExprs_, substOrderBy, analyzer, false);
-    sortInfo_.substituteOrderingExprs(substOrderBy, analyzer);
-    sortInfo_.setMaterializedTupleInfo(sortTupleDesc, sortTupleExprs);
+    substituteResultExprs(smap, analyzer);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
index 8d8f955..31f4d18 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java
@@ -16,11 +16,15 @@
 // under the License.
 
 package org.apache.impala.analysis;
+import org.apache.impala.common.TreeNode;
 
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Encapsulates all the information needed to compute ORDER BY
@@ -34,7 +38,7 @@ public class SortInfo {
   private final List<Boolean> isAscOrder_;
   // True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
   private final List<Boolean> nullsFirstParams_;
-  // The single tuple that is materialized, sorted and output by a sort operator
+  // The single tuple that is materialized, sorted, and output by a sort operator
   // (i.e. SortNode or TopNNode)
   private TupleDescriptor sortTupleDesc_;
   // Input expressions materialized into sortTupleDesc_. One expr per slot in
@@ -63,8 +67,14 @@ public class SortInfo {
     }
   }
 
+  /**
+   * Sets sortTupleDesc_, which is the internal row representation to be materialized and
+   * sorted. The source exprs of the slots in sortTupleDesc_ are changed to those in
+   * tupleSlotExprs.
+   */
   public void setMaterializedTupleInfo(
       TupleDescriptor tupleDesc, List<Expr> tupleSlotExprs) {
+    Preconditions.checkState(tupleDesc.getSlots().size() == tupleSlotExprs.size());
     sortTupleDesc_ = tupleDesc;
     sortTupleSlotExprs_ = tupleSlotExprs;
     for (int i = 0; i < sortTupleDesc_.getSlots().size(); ++i) {
@@ -113,6 +123,11 @@ public class SortInfo {
     analyzer.materializeSlots(substMaterializedExprs);
   }
 
+  /**
+   * Replaces orderingExprs_ according to smap. This needs to be called to make sure that
+   * the ordering exprs refer to the new tuple materialized by this sort instead of the
+   * original input.
+   */
   public void substituteOrderingExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
     orderingExprs_ = Expr.substituteList(orderingExprs_, smap, analyzer, false);
   }
@@ -128,4 +143,50 @@ public class SortInfo {
 
   @Override
   public SortInfo clone() { return new SortInfo(this); }
+
+  /**
+   * Create a tuple descriptor for the single tuple that is materialized, sorted, and
+   * output by the sort node. Done by materializing slot refs in the order-by and given
+   * result expressions. Those slot refs in the ordering and result exprs are substituted
+   * with slot refs into the new tuple. This simplifies the sorting logic for total and
+   * top-n sorts. The substitution map is returned.
+   * TODO: We could do something more sophisticated than simply copying input slot refs -
+   * e.g. compute some order-by expressions.
+   */
+  public ExprSubstitutionMap createSortTupleInfo(
+      List<Expr> resultExprs, Analyzer analyzer) {
+    // sourceSlots contains the slots from the sort input to materialize.
+    Set<SlotRef> sourceSlots = Sets.newHashSet();
+
+    TreeNode.collect(resultExprs, Predicates.instanceOf(SlotRef.class), sourceSlots);
+    TreeNode.collect(orderingExprs_, Predicates.instanceOf(SlotRef.class), sourceSlots);
+
+    // The descriptor for the tuples on which the sort operates.
+    TupleDescriptor sortTupleDesc = analyzer.getDescTbl().createTupleDescriptor("sort");
+    sortTupleDesc.setIsMaterialized(true);
+
+    List<Expr> sortTupleExprs = Lists.newArrayList();
+
+    // substOrderBy is the mapping from slot refs in the sort node's input to slot refs in
+    // the materialized sort tuple. Each slot ref in the input gets cloned and builds up
+    // the tuple operated on and returned by the sort node.
+    ExprSubstitutionMap substOrderBy = new ExprSubstitutionMap();
+    for (SlotRef origSlotRef: sourceSlots) {
+      SlotDescriptor origSlotDesc = origSlotRef.getDesc();
+      SlotDescriptor materializedDesc =
+          analyzer.copySlotDescriptor(origSlotDesc, sortTupleDesc);
+      SlotRef cloneRef = new SlotRef(materializedDesc);
+      substOrderBy.put(origSlotRef, cloneRef);
+      sortTupleExprs.add(origSlotRef);
+    }
+
+    // The ordering exprs still point to the old slot refs and need to be replaced with
+    // ones that point to the slot refs into the sort's output tuple.
+    substituteOrderingExprs(substOrderBy, analyzer);
+
+    // Update the tuple descriptor used to materialize the input of the sort.
+    setMaterializedTupleInfo(sortTupleDesc, sortTupleExprs);
+
+    return substOrderBy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 7d78715..a7a5933 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -326,6 +326,10 @@ public class KuduTable extends Table {
     return tbl;
   }
 
+  public boolean isPrimaryKeyColumn(String name) {
+    return primaryKeyColumnNames_.contains(name);
+  }
+
   public TResultSet getTableStats() throws ImpalaRuntimeException {
     TResultSet result = new TResultSet();
     TResultSetMetadata resultSchema = new TResultSetMetadata();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 8abb901..fade723 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -20,6 +20,7 @@ package org.apache.impala.planner;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
@@ -29,11 +30,15 @@ import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.QueryStmt;
+import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.catalog.HBaseTable;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.common.TreeNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryExecRequest;
@@ -45,7 +50,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Creates an executable plan from an analyzed parse tree and query options.
@@ -140,6 +147,8 @@ public class Planner {
         rootFragment = distributedPlanner.createInsertFragment(
             rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
       }
+      // Add optional sort node to the plan, based on clustered/noclustered plan hint.
+      createClusteringSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
       // set up table sink for root fragment
       rootFragment.setSink(insertStmt.createDataSink());
       resultExprs = insertStmt.getResultExprs();
@@ -483,4 +492,43 @@ public class Planner {
     newJoinNode.init(analyzer);
     return newJoinNode;
   }
+
+  /**
+   * Insert a sort node on top of the plan, depending on the clustered/noclustered plan
+   * hint. This will sort the data produced by 'inputFragment' by the clustering columns
+   * (key columns for Kudu tables), so that partitions can be written sequentially in the
+   * table sink.
+   */
+  public void createClusteringSort(InsertStmt insertStmt, PlanFragment inputFragment,
+       Analyzer analyzer) throws ImpalaException {
+    if (!insertStmt.hasClusteredHint()) return;
+
+    List<Expr> orderingExprs;
+    if (insertStmt.getTargetTable() instanceof KuduTable) {
+      orderingExprs = Lists.newArrayList(insertStmt.getPrimaryKeyExprs());
+    } else {
+      orderingExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
+    }
+    // Ignore constants for the sake of clustering.
+    Expr.removeConstants(orderingExprs);
+
+    if (orderingExprs.isEmpty()) return;
+
+    // Build sortinfo to sort by the ordering exprs.
+    List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), false);
+    List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
+    SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams);
+
+    ExprSubstitutionMap smap = sortInfo.createSortTupleInfo(
+        insertStmt.getResultExprs(), analyzer);
+    sortInfo.getSortTupleDescriptor().materializeSlots();
+
+    insertStmt.substituteResultExprs(smap, analyzer);
+
+    SortNode sortNode = new SortNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot(),
+        sortInfo, false, 0);
+    sortNode.init(analyzer);
+
+    inputFragment.setPlanRoot(sortNode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index f437bc8..31c0fdc 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -1742,12 +1742,12 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       AnalysisError(String.format(
           "insert into table functional_hbase.alltypes %sshuffle%s " +
           "select * from functional_hbase.alltypes", prefix, suffix),
-          "INSERT hints are only supported for inserting into Hdfs tables.");
+          "INSERT hints are only supported for inserting into Hdfs and Kudu tables.");
       // Conflicting plan hints.
       AnalysisError("insert into table functional.alltypessmall " +
           "partition (year, month) /* +shuffle,noshuffle */ " +
           "select * from functional.alltypes",
-          "Conflicting INSERT hint: noshuffle");
+          "Conflicting INSERT hints: shuffle and noshuffle");
     }
 
     // Multiple non-conflicting hints and case insensitivity of hints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index b608dea..4efb68b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -570,3 +570,138 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ====
+# IMPALA-2521: clustered insert into partitioned table adds sort node.
+insert into table functional.alltypes partition(year, month) /*+ clustered */
+select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+02:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-2521: clustered insert into partitioned table adds sort node.
+insert into table functional.alltypes partition(year, month) /*+ clustered,noshuffle */
+select * from functional.alltypes
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+01:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# IMPALA-2521: clustered insert into partitioned table adds sort node. Subquery in
+# WHERE-clause exercises the reset() + analyze() path during rewrite.
+insert into table functional.alltypes partition(year, month) /*+ clustered */
+select * from functional.alltypes
+where int_col = (select max(int_col) from functional.alltypes)
+---- PLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+04:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+03:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: int_col = max(int_col)
+|  runtime filters: RF000 <- max(int_col)
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  output: max(int_col)
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> int_col
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)]
+|  partitions=24
+|
+08:SORT
+|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|
+07:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
+|
+03:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: int_col = max(int_col)
+|  runtime filters: RF000 <- max(int_col)
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  05:AGGREGATE [FINALIZE]
+|  |  output: max:merge(int_col)
+|  |
+|  04:EXCHANGE [UNPARTITIONED]
+|  |
+|  02:AGGREGATE
+|  |  output: max(int_col)
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     partitions=24/24 files=24 size=478.45KB
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> int_col
+====
+# IMPALA-2521: clustered insert into non-partitioned table does not add sort node.
+insert into table functional.alltypesnopart /*+ clustered */
+select * from functional.alltypesnopart
+---- PLAN
+WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
+|  partitions=1
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
+|  partitions=1
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+====
+# IMPALA-2521: clustered insert into non-partitioned table does not add sort node.
+insert into table functional.alltypesnopart /*+ clustered,shuffle */
+select * from functional.alltypesnopart
+---- PLAN
+WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
+|  partitions=1
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
+|  partitions=1
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.alltypesnopart]
+   partitions=1/1 files=0 size=0B
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index d3022a8..78a01df 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -224,3 +224,61 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.alltypes]
    kudu predicates: id > 1475059865, id < 1475059775
 ====
+# IMPALA-2521: clustered insert into table adds sort node.
+insert into table functional_kudu.alltypes /*+ clustered */
+select * from functional_kudu.alltypes
+---- PLAN
+INSERT INTO KUDU [functional_kudu.alltypes]
+|  check unique keys: false
+|
+01:SORT
+|  order by: id DESC NULLS LAST
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+---- DISTRIBUTEDPLAN
+INSERT INTO KUDU [functional_kudu.alltypes]
+|  check unique keys: false
+|
+01:SORT
+|  order by: id DESC NULLS LAST
+|
+00:SCAN KUDU [functional_kudu.alltypes]
+====
+# IMPALA-2521: clustered insert into table adds sort node, correctly substituting exprs.
+insert into table functional_kudu.testtbl /*+ clustered */
+select id, name, maxzip as zip
+from (
+select id, max(zip) as maxzip, name
+from functional_kudu.testtbl group by id, name
+) as sub;
+---- PLAN
+INSERT INTO KUDU [functional_kudu.testtbl]
+|  check unique keys: false
+|
+02:SORT
+|  order by: id DESC NULLS LAST
+|
+01:AGGREGATE [FINALIZE]
+|  output: max(zip)
+|  group by: id, name
+|
+00:SCAN KUDU [functional_kudu.testtbl]
+---- DISTRIBUTEDPLAN
+INSERT INTO KUDU [functional_kudu.testtbl]
+|  check unique keys: false
+|
+04:SORT
+|  order by: id DESC NULLS LAST
+|
+03:AGGREGATE [FINALIZE]
+|  output: max:merge(zip)
+|  group by: id, name
+|
+02:EXCHANGE [HASH(id,name)]
+|
+01:AGGREGATE [STREAMING]
+|  output: max(zip)
+|  group by: id, name
+|
+00:SCAN KUDU [functional_kudu.testtbl]
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/testdata/workloads/functional-query/queries/QueryTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test
index d6359ad..b0c5c7c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test
@@ -858,3 +858,17 @@ DROP PARTITIONS alltypesinsert
 ---- CATCH
 Memory limit exceeded
 ====
+---- QUERY
+# IMPALA-2521: clustered insert into table
+insert into table alltypesinsert
+partition (year, month) /*+ clustered */
+select * from functional.alltypessmall;
+---- SETUP
+DROP PARTITIONS alltypesinsert
+RESET alltypesinsert
+---- RESULTS
+year=2009/month=1/: 25
+year=2009/month=2/: 25
+year=2009/month=3/: 25
+year=2009/month=4/: 25
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c24e9da9/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index e4f3205..18281e3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -332,3 +332,36 @@ DELETE FROM kudu_test_tbl WHERE id > -1;
 ---- RUNTIME_PROFILE
 row_regex: .*NumModifiedRows: 7300.*
 ====
+---- QUERY
+# IMPALA-2521: clustered insert into table.
+create table impala_2521
+(id bigint primary key, name string, zip int)
+distribute by hash into 3 buckets stored as kudu
+---- RESULTS
+====
+---- QUERY
+insert into impala_2521 /*+ clustered */
+select id, name, maxzip as zip
+from (
+select tinyint_col as id, cast(max(int_col) + 1 as int) as maxzip, string_col as name
+from functional_kudu.alltypessmall group by id, name
+) as sub;
+---- RESULTS
+: 10
+====
+---- QUERY
+select * from impala_2521
+---- RESULTS
+0,'0',1
+1,'1',2
+2,'2',3
+3,'3',4
+4,'4',5
+5,'5',6
+6,'6',7
+7,'7',8
+8,'8',9
+9,'9',10
+---- TYPES
+BIGINT,STRING,INT
+====