You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/15 23:00:28 UTC

[02/50] [abbrv] incubator-impala git commit: IMPALA-4561: Replace DISTRIBUTE BY with PARTITION BY in CREATE TABLE

IMPALA-4561: Replace DISTRIBUTE BY with PARTITION BY in CREATE TABLE

Change-Id: I0e07c41eabb4c8cb95754cf04293cbd9e03d6ab2
Reviewed-on: http://gerrit.cloudera.org:8080/5317
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cba93f1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cba93f1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cba93f1a

Branch: refs/heads/hadoop-next
Commit: cba93f1ac3d4c0219c6266924493ad19c8c10556
Parents: f837754
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Dec 1 20:43:43 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Dec 6 10:41:53 2016 +0000

----------------------------------------------------------------------
 common/thrift/CatalogObjects.thrift             |  20 +-
 common/thrift/JniCatalog.thrift                 |   4 +-
 fe/src/main/cup/sql-parser.cup                  |  56 ++---
 .../AlterTableAddDropRangePartitionStmt.java    |   4 +-
 .../analysis/CreateTableAsSelectStmt.java       |   3 +-
 .../apache/impala/analysis/CreateTableStmt.java |  38 ++--
 .../apache/impala/analysis/DistributeParam.java | 211 -------------------
 .../impala/analysis/KuduPartitionParam.java     | 211 +++++++++++++++++++
 .../apache/impala/analysis/RangePartition.java  |  18 +-
 .../apache/impala/analysis/TableDataLayout.java |  21 +-
 .../org/apache/impala/analysis/TableDef.java    |   6 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |  14 +-
 .../org/apache/impala/catalog/KuduTable.java    |  81 +++----
 .../impala/service/KuduCatalogOpExecutor.java   |  36 ++--
 .../apache/impala/analysis/AnalyzeDDLTest.java  | 164 +++++++-------
 .../impala/analysis/AuthorizationTest.java      |   2 +-
 .../org/apache/impala/analysis/ParserTest.java  |  58 ++---
 testdata/bin/generate-schema-statements.py      |   2 +-
 .../functional/functional_schema_template.sql   |  28 +--
 testdata/datasets/tpcds/tpcds_kudu_template.sql |  48 ++---
 testdata/datasets/tpch/tpch_kudu_template.sql   |  16 +-
 testdata/datasets/tpch/tpch_schema_template.sql |  16 +-
 .../queries/PlannerTest/lineage.test            |   4 +-
 .../queries/QueryTest/kudu-scan-node.test       |   6 +-
 .../QueryTest/kudu-timeouts-catalogd.test       |   2 +-
 .../queries/QueryTest/kudu_alter.test           |   6 +-
 .../queries/QueryTest/kudu_create.test          |  24 +--
 .../queries/QueryTest/kudu_delete.test          |   6 +-
 .../queries/QueryTest/kudu_describe.test        |   2 +-
 .../queries/QueryTest/kudu_insert.test          |  10 +-
 .../queries/QueryTest/kudu_partition_ddl.test   |  28 +--
 .../queries/QueryTest/kudu_stats.test           |   2 +-
 .../queries/QueryTest/kudu_update.test          |   2 +-
 .../queries/QueryTest/kudu_upsert.test          |   4 +-
 tests/query_test/test_cancellation.py           |   2 +-
 tests/query_test/test_kudu.py                   |  38 ++--
 tests/shell/test_shell_commandline.py           |   2 +-
 37 files changed, 598 insertions(+), 597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 10cb777..de89b51 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -357,8 +357,8 @@ struct TDataSourceTable {
   2: required string init_string
 }
 
-// Parameters needed for hash distribution
-struct TDistributeByHashParam {
+// Parameters needed for hash partitioning
+struct TKuduPartitionByHashParam {
   1: required list<string> columns
   2: required i32 num_buckets
 }
@@ -370,16 +370,16 @@ struct TRangePartition {
   4: optional bool is_upper_bound_inclusive
 }
 
-// A range distribution is identified by a list of columns and a list of range partitions.
-struct TDistributeByRangeParam {
+// A range partitioning is identified by a list of columns and a list of range partitions.
+struct TKuduPartitionByRangeParam {
   1: required list<string> columns
   2: optional list<TRangePartition> range_partitions
 }
 
-// Parameters for the DISTRIBUTE BY clause.
-struct TDistributeParam {
-  1: optional TDistributeByHashParam by_hash_param;
-  2: optional TDistributeByRangeParam by_range_param;
+// Parameters for the PARTITION BY clause.
+struct TKuduPartitionParam {
+  1: optional TKuduPartitionByHashParam by_hash_param;
+  2: optional TKuduPartitionByRangeParam by_range_param;
 }
 
 // Represents a Kudu table
@@ -392,8 +392,8 @@ struct TKuduTable {
   // Name of the key columns
   3: required list<string> key_columns
 
-  // Distribution schemes
-  4: required list<TDistributeParam> distribute_by
+  // Partitioning
+  4: required list<TKuduPartitionParam> partition_by
 }
 
 // Represents a table or view.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 8bb07e6..d224658 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -415,9 +415,9 @@ struct TCreateTableParams {
   // If set, the table will be cached after creation with details specified in cache_op.
   13: optional THdfsCachingOp cache_op
 
-  // If set, the table is automatically distributed according to this parameter.
+  // If set, the table is automatically partitioned according to this parameter.
   // Kudu-only.
-  14: optional list<CatalogObjects.TDistributeParam> distribute_by
+  14: optional list<CatalogObjects.TKuduPartitionParam> partition_by
 
   // Primary key column names (Kudu-only)
   15: optional list<string> primary_key_column_names;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index a375c75..e09993b 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -401,21 +401,21 @@ nonterminal CreateTableAsSelectStmt create_tbl_as_select_stmt;
 nonterminal CreateTableLikeStmt create_tbl_like_stmt;
 nonterminal CreateTableStmt create_tbl_stmt;
 nonterminal TableDef tbl_def_without_col_defs, tbl_def_with_col_defs;
-nonterminal TableDataLayout opt_tbl_data_layout, distributed_data_layout;
+nonterminal TableDataLayout opt_tbl_data_layout, partitioned_data_layout;
 nonterminal TableDef.Options tbl_options;
 nonterminal CreateViewStmt create_view_stmt;
 nonterminal CreateDataSrcStmt create_data_src_stmt;
 nonterminal DropDataSrcStmt drop_data_src_stmt;
 nonterminal ShowDataSrcsStmt show_data_srcs_stmt;
 nonterminal StructField struct_field_def;
-nonterminal DistributeParam distribute_hash_param;
+nonterminal KuduPartitionParam hash_partition_param;
 nonterminal List<RangePartition> range_params_list;
 nonterminal RangePartition range_param;
 nonterminal Pair<Expr, Boolean> opt_lower_range_val,
    opt_upper_range_val;
-nonterminal ArrayList<DistributeParam> distribute_hash_param_list;
-nonterminal ArrayList<DistributeParam> distribute_param_list;
-nonterminal DistributeParam distribute_range_param;
+nonterminal ArrayList<KuduPartitionParam> hash_partition_param_list;
+nonterminal ArrayList<KuduPartitionParam> partition_param_list;
+nonterminal KuduPartitionParam range_partition_param;
 nonterminal ColumnDef column_def, view_column_def;
 nonterminal ArrayList<ColumnDef> column_def_list, partition_column_defs,
   view_column_def_list, view_column_defs;
@@ -1013,12 +1013,12 @@ create_tbl_as_select_stmt ::=
     // An optional clause cannot be used directly below because it would conflict with
     // the first rule in "create_tbl_stmt".
     primary_keys:primary_keys
-    distributed_data_layout:distribute_params
+    partitioned_data_layout:partition_params
     tbl_options:options
     KW_AS query_stmt:select_stmt
   {:
     tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
-    tbl_def.getDistributeParams().addAll(distribute_params.getDistributeParams());
+    tbl_def.getKuduPartitionParams().addAll(partition_params.getKuduPartitionParams());
     tbl_def.setOptions(options);
     RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def), select_stmt, null);
   :}
@@ -1057,7 +1057,7 @@ create_tbl_stmt ::=
     tbl_options:options
   {:
     tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
-    tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+    tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
     tbl_def.setOptions(options);
     RESULT = new CreateTableStmt(tbl_def);
   :}
@@ -1082,7 +1082,7 @@ create_tbl_stmt ::=
     tbl_options:options
   {:
     tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
-    tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+    tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
     tbl_def.setOptions(options);
     RESULT = new CreateTableLikeFileStmt(new CreateTableStmt(tbl_def),
         schema_file_format, new HdfsUri(schema_location));
@@ -1148,13 +1148,13 @@ tbl_options ::=
 opt_tbl_data_layout ::=
   partition_column_defs:partition_column_defs
   {: RESULT = TableDataLayout.createPartitionedLayout(partition_column_defs); :}
-  | distributed_data_layout:data_layout
+  | partitioned_data_layout:data_layout
   {: RESULT = data_layout; :}
   ;
 
-distributed_data_layout ::=
-  distribute_param_list:distribute_params
-  {: RESULT = TableDataLayout.createDistributedLayout(distribute_params); :}
+partitioned_data_layout ::=
+  partition_param_list:partition_params
+  {: RESULT = TableDataLayout.createKuduPartitionedLayout(partition_params); :}
   | /* empty */
   {: RESULT = TableDataLayout.createEmptyLayout(); :}
   ;
@@ -1164,25 +1164,25 @@ partition_column_defs ::=
   {: RESULT = col_defs; :}
   ;
 
-// The DISTRIBUTE clause contains any number of HASH() clauses followed by exactly zero
+// The PARTITION BY clause contains any number of HASH() clauses followed by exactly zero
 // or one RANGE clauses
-distribute_param_list ::=
-  KW_DISTRIBUTE KW_BY distribute_hash_param_list:list
+partition_param_list ::=
+  KW_PARTITION KW_BY hash_partition_param_list:list
   {: RESULT = list; :}
-  | KW_DISTRIBUTE KW_BY distribute_range_param:rng
+  | KW_PARTITION KW_BY range_partition_param:rng
   {: RESULT = Lists.newArrayList(rng); :}
-  | KW_DISTRIBUTE KW_BY distribute_hash_param_list:list COMMA distribute_range_param:rng
+  | KW_PARTITION KW_BY hash_partition_param_list:list COMMA range_partition_param:rng
   {:
     list.add(rng);
     RESULT = list;
   :}
   ;
 
-// A list of HASH distribution clauses used for flexible partitioning
-distribute_hash_param_list ::=
-  distribute_hash_param:dc
+// A list of HASH partitioning clauses used for flexible partitioning
+hash_partition_param_list ::=
+  hash_partition_param:dc
   {: RESULT = Lists.newArrayList(dc); :}
-  | distribute_hash_param_list:list COMMA distribute_hash_param:d
+  | hash_partition_param_list:list COMMA hash_partition_param:d
   {:
     list.add(d);
     RESULT = list;
@@ -1190,26 +1190,26 @@ distribute_hash_param_list ::=
   ;
 
 // The column list for a HASH clause is optional.
-distribute_hash_param ::=
+hash_partition_param ::=
   KW_HASH LPAREN ident_list:cols RPAREN KW_INTO
     INTEGER_LITERAL:buckets KW_BUCKETS
-  {: RESULT = DistributeParam.createHashParam(cols, buckets.intValue()); :}
+  {: RESULT = KuduPartitionParam.createHashParam(cols, buckets.intValue()); :}
   | KW_HASH KW_INTO INTEGER_LITERAL:buckets KW_BUCKETS
   {:
-    RESULT = DistributeParam.createHashParam(Lists.<String>newArrayList(),
+    RESULT = KuduPartitionParam.createHashParam(Lists.<String>newArrayList(),
         buckets.intValue());
   :}
   ;
 
 // The column list for a RANGE clause is optional.
-distribute_range_param ::=
+range_partition_param ::=
   KW_RANGE LPAREN ident_list:cols RPAREN LPAREN range_params_list:ranges RPAREN
   {:
-    RESULT = DistributeParam.createRangeParam(cols, ranges);
+    RESULT = KuduPartitionParam.createRangeParam(cols, ranges);
   :}
   | KW_RANGE LPAREN range_params_list:ranges RPAREN
   {:
-    RESULT = DistributeParam.createRangeParam(Collections.<String>emptyList(), ranges);
+    RESULT = KuduPartitionParam.createRangeParam(Collections.<String>emptyList(), ranges);
   :}
   ;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
index b1618e0..aee07f7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddDropRangePartitionStmt.java
@@ -93,10 +93,10 @@ public class AlterTableAddDropRangePartitionStmt extends AlterTableStmt {
           "partitions: RANGE %s", table.getFullName(), rangePartitionSpec_.toSql()));
     }
     KuduTable kuduTable = (KuduTable) table;
-    List<String> colNames = kuduTable.getRangeDistributionColNames();
+    List<String> colNames = kuduTable.getRangePartitioningColNames();
     if (colNames.isEmpty()) {
       throw new AnalysisException(String.format("Cannot add/drop partition %s: " +
-          "Kudu table %s doesn't have a range-based distribution.",
+          "Kudu table %s doesn't have a range-based partitioning.",
           rangePartitionSpec_.toSql(), kuduTable.getName()));
     }
     List<ColumnDef> rangeColDefs = Lists.newArrayListWithCapacity(colNames.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index cd7b1c8..eb492c6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -193,7 +193,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
       Table tmpTable = null;
       if (KuduTable.isKuduTable(msTbl)) {
         tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
-            createStmt_.getTblPrimaryKeyColumnNames(), createStmt_.getDistributeParams());
+            createStmt_.getTblPrimaryKeyColumnNames(),
+            createStmt_.getKuduPartitionParams());
       } else {
         // TODO: Creating a tmp table using load() is confusing.
         // Refactor it to use a 'createCtasTarget()' function similar to Kudu table.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index f2db81b..1139005 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -88,8 +88,8 @@ public class CreateTableStmt extends StatementBase {
   public List<ColumnDef> getPartitionColumnDefs() {
     return tableDef_.getPartitionColumnDefs();
   }
-  public List<DistributeParam> getDistributeParams() {
-    return tableDef_.getDistributeParams();
+  public List<KuduPartitionParam> getKuduPartitionParams() {
+    return tableDef_.getKuduPartitionParams();
   }
   public String getComment() { return tableDef_.getComment(); }
   Map<String, String> getTblProperties() { return tableDef_.getTblProperties(); }
@@ -146,8 +146,8 @@ public class CreateTableStmt extends StatementBase {
     params.setIf_not_exists(getIfNotExists());
     params.setTable_properties(getTblProperties());
     params.setSerde_properties(getSerdeProperties());
-    for (DistributeParam d: getDistributeParams()) {
-      params.addToDistribute_by(d.toThrift());
+    for (KuduPartitionParam d: getKuduPartitionParams()) {
+      params.addToPartition_by(d.toThrift());
     }
     for (ColumnDef pkColDef: getPrimaryKeyColumnDefs()) {
       params.addToPrimary_key_column_names(pkColDef.getColName());
@@ -188,8 +188,8 @@ public class CreateTableStmt extends StatementBase {
           getTblProperties().get(KuduTable.KEY_STORAGE_HANDLER))) {
         throw new AnalysisException(KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
       }
-      AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
-          "Only Kudu tables can use the DISTRIBUTE BY clause.");
+      AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
+          "Only Kudu tables can use the PARTITION BY clause.");
       if (hasPrimaryKey()) {
         throw new AnalysisException("Only Kudu tables can specify a PRIMARY KEY.");
       }
@@ -263,8 +263,8 @@ public class CreateTableStmt extends StatementBase {
             KuduTable.KEY_TABLET_REPLICAS));
     AnalysisUtils.throwIfNotEmpty(getColumnDefs(),
         "Columns cannot be specified with an external Kudu table.");
-    AnalysisUtils.throwIfNotEmpty(getDistributeParams(),
-        "DISTRIBUTE BY cannot be used with an external Kudu table.");
+    AnalysisUtils.throwIfNotEmpty(getKuduPartitionParams(),
+        "PARTITION BY cannot be used with an external Kudu table.");
   }
 
   /**
@@ -305,29 +305,29 @@ public class CreateTableStmt extends StatementBase {
       }
     }
 
-    if (!getDistributeParams().isEmpty()) {
-      analyzeDistributeParams(analyzer);
+    if (!getKuduPartitionParams().isEmpty()) {
+      analyzeKuduPartitionParams(analyzer);
     } else {
-      throw new AnalysisException("Table distribution must be specified for " +
+      throw new AnalysisException("Table partitioning must be specified for " +
           "managed Kudu tables.");
     }
   }
 
   /**
-   * Analyzes the distribution schemes specified in the CREATE TABLE statement.
+   * Analyzes the partitioning schemes specified in the CREATE TABLE statement.
    */
-  private void analyzeDistributeParams(Analyzer analyzer) throws AnalysisException {
+  private void analyzeKuduPartitionParams(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkState(getFileFormat() == THdfsFileFormat.KUDU);
     Map<String, ColumnDef> pkColDefsByName =
         ColumnDef.mapByColumnNames(getPrimaryKeyColumnDefs());
-    for (DistributeParam distributeParam: getDistributeParams()) {
-      // If no column names were specified in this distribution scheme, use all the
+    for (KuduPartitionParam partitionParam: getKuduPartitionParams()) {
+      // If no column names were specified in this partitioning scheme, use all the
       // primary key columns.
-      if (!distributeParam.hasColumnNames()) {
-        distributeParam.setColumnNames(pkColDefsByName.keySet());
+      if (!partitionParam.hasColumnNames()) {
+        partitionParam.setColumnNames(pkColDefsByName.keySet());
       }
-      distributeParam.setPkColumnDefMap(pkColDefsByName);
-      distributeParam.analyze(analyzer);
+      partitionParam.setPkColumnDefMap(pkColDefsByName);
+      partitionParam.analyze(analyzer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
deleted file mode 100644
index 0eb1329..0000000
--- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java
+++ /dev/null
@@ -1,211 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.analysis;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.impala.common.AnalysisException;
-import org.apache.impala.thrift.TDistributeByHashParam;
-import org.apache.impala.thrift.TDistributeByRangeParam;
-import org.apache.impala.thrift.TDistributeParam;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Represents the distribution of a Kudu table as defined in the DISTRIBUTE BY
- * clause of a CREATE TABLE statement. The distribution can be hash-based or
- * range-based or both. See RangePartition for details on the supported range partitions.
- *
- * Examples:
- * - Hash-based:
- *   DISTRIBUTE BY HASH(id) INTO 10 BUCKETS
- * - Single column range-based:
- *   DISTRIBUTE BY RANGE(age)
- *   (
- *     PARTITION VALUES < 10,
- *     PARTITION 10 <= VALUES < 20,
- *     PARTITION 20 <= VALUES < 30,
- *     PARTITION VALUE = 100
- *   )
- * - Combination of hash and range based:
- *   DISTRIBUTE BY HASH (id) INTO 3 BUCKETS,
- *   RANGE (age)
- *   (
- *     PARTITION 10 <= VALUES < 20,
- *     PARTITION VALUE = 100
- *   )
- * - Multi-column range based:
- *   DISTRIBUTE BY RANGE (year, quarter)
- *   (
- *     PARTITION VALUE = (2001, 1),
- *     PARTITION VALUE = (2001, 2),
- *     PARTITION VALUE = (2002, 1)
- *   )
- *
- */
-public class DistributeParam implements ParseNode {
-
-  /**
-   * Creates a hash-based DistributeParam.
-   */
-  public static DistributeParam createHashParam(List<String> cols, int buckets) {
-    return new DistributeParam(Type.HASH, cols, buckets, null);
-  }
-
-  /**
-   * Creates a range-based DistributeParam.
-   */
-  public static DistributeParam createRangeParam(List<String> cols,
-      List<RangePartition> rangePartitions) {
-    return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions);
-  }
-
-  private static final int NO_BUCKETS = -1;
-
-  /**
-   * The distribution type.
-   */
-  public enum Type {
-    HASH, RANGE
-  }
-
-  // Columns of this distribution. If no columns are specified, all
-  // the primary key columns of the associated table are used.
-  private final List<String> colNames_ = Lists.newArrayList();
-
-  // Map of primary key column names to the associated column definitions. Must be set
-  // before the call to analyze().
-  private Map<String, ColumnDef> pkColumnDefByName_;
-
-  // Distribution scheme type
-  private final Type type_;
-
-  // Only relevant for hash-based distribution, -1 otherwise
-  private final int numBuckets_;
-
-  // List of range partitions specified in a range-based distribution.
-  private List<RangePartition> rangePartitions_;
-
-  private DistributeParam(Type t, List<String> colNames, int buckets,
-      List<RangePartition> partitions) {
-    type_ = t;
-    for (String name: colNames) colNames_.add(name.toLowerCase());
-    rangePartitions_ = partitions;
-    numBuckets_ = buckets;
-  }
-
-  @Override
-  public void analyze(Analyzer analyzer) throws AnalysisException {
-    Preconditions.checkState(!colNames_.isEmpty());
-    Preconditions.checkNotNull(pkColumnDefByName_);
-    Preconditions.checkState(!pkColumnDefByName_.isEmpty());
-    // Validate that the columns specified in this distribution are primary key columns.
-    for (String colName: colNames_) {
-      if (!pkColumnDefByName_.containsKey(colName)) {
-        throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
-            "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql()));
-      }
-    }
-    if (type_ == Type.RANGE) analyzeRangeParam(analyzer);
-  }
-
-  /**
-   * Analyzes a range-based distribution. This function does not check for overlapping
-   * range partitions; these checks are performed by Kudu and an error is reported back
-   * to the user.
-   */
-  public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException {
-    List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size());
-    for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName));
-    for (RangePartition rangePartition: rangePartitions_) {
-      rangePartition.analyze(analyzer, pkColDefs);
-    }
-  }
-
-  @Override
-  public String toSql() {
-    StringBuilder builder = new StringBuilder(type_.toString());
-    if (!colNames_.isEmpty()) {
-      builder.append(" (");
-      Joiner.on(", ").appendTo(builder, colNames_).append(")");
-    }
-    if (type_ == Type.HASH) {
-      builder.append(" INTO ");
-      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
-      builder.append(numBuckets_).append(" BUCKETS");
-    } else {
-      builder.append(" (");
-      if (rangePartitions_ != null) {
-        List<String> partsSql = Lists.newArrayList();
-        for (RangePartition rangePartition: rangePartitions_) {
-          partsSql.add(rangePartition.toSql());
-        }
-        builder.append(Joiner.on(", ").join(partsSql));
-      } else {
-        builder.append("...");
-      }
-      builder.append(")");
-    }
-    return builder.toString();
-  }
-
-  @Override
-  public String toString() { return toSql(); }
-
-  public TDistributeParam toThrift() {
-    TDistributeParam result = new TDistributeParam();
-    // TODO: Add a validate() function to ensure the validity of distribute params.
-    if (type_ == Type.HASH) {
-      TDistributeByHashParam hash = new TDistributeByHashParam();
-      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
-      hash.setNum_buckets(numBuckets_);
-      hash.setColumns(colNames_);
-      result.setBy_hash_param(hash);
-    } else {
-      Preconditions.checkState(type_ == Type.RANGE);
-      TDistributeByRangeParam rangeParam = new TDistributeByRangeParam();
-      rangeParam.setColumns(colNames_);
-      if (rangePartitions_ == null) {
-        result.setBy_range_param(rangeParam);
-        return result;
-      }
-      for (RangePartition rangePartition: rangePartitions_) {
-        rangeParam.addToRange_partitions(rangePartition.toThrift());
-      }
-      result.setBy_range_param(rangeParam);
-    }
-    return result;
-  }
-
-  void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
-    pkColumnDefByName_ = pkColumnDefByName;
-  }
-
-  boolean hasColumnNames() { return !colNames_.isEmpty(); }
-  public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); }
-  void setColumnNames(Collection<String> colNames) {
-    Preconditions.checkState(colNames_.isEmpty());
-    colNames_.addAll(colNames);
-  }
-
-  public Type getType() { return type_; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
new file mode 100644
index 0000000..3f69fae
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionParam.java
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TKuduPartitionByHashParam;
+import org.apache.impala.thrift.TKuduPartitionByRangeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Represents the partitioning of a Kudu table as defined in the PARTITION BY
+ * clause of a CREATE TABLE statement. The partitioning can be hash-based or
+ * range-based or both. See RangePartition for details on the supported range partitions.
+ *
+ * Examples:
+ * - Hash-based:
+ *   PARTITION BY HASH(id) INTO 10 BUCKETS
+ * - Single column range-based:
+ *   PARTITION BY RANGE(age)
+ *   (
+ *     PARTITION VALUES < 10,
+ *     PARTITION 10 <= VALUES < 20,
+ *     PARTITION 20 <= VALUES < 30,
+ *     PARTITION VALUE = 100
+ *   )
+ * - Combination of hash and range based:
+ *   PARTITION BY HASH (id) INTO 3 BUCKETS,
+ *   RANGE (age)
+ *   (
+ *     PARTITION 10 <= VALUES < 20,
+ *     PARTITION VALUE = 100
+ *   )
+ * - Multi-column range based:
+ *   PARTITION BY RANGE (year, quarter)
+ *   (
+ *     PARTITION VALUE = (2001, 1),
+ *     PARTITION VALUE = (2001, 2),
+ *     PARTITION VALUE = (2002, 1)
+ *   )
+ *
+ */
+public class KuduPartitionParam implements ParseNode {
+
+  /**
+   * Creates a hash-based KuduPartitionParam.
+   */
+  public static KuduPartitionParam createHashParam(List<String> cols, int buckets) {
+    return new KuduPartitionParam(Type.HASH, cols, buckets, null);
+  }
+
+  /**
+   * Creates a range-based KuduPartitionParam.
+   */
+  public static KuduPartitionParam createRangeParam(List<String> cols,
+      List<RangePartition> rangePartitions) {
+    return new KuduPartitionParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions);
+  }
+
+  private static final int NO_BUCKETS = -1;
+
+  /**
+   * The partitioning type.
+   */
+  public enum Type {
+    HASH, RANGE
+  }
+
+  // Columns of this partitioning. If no columns are specified, all
+  // the primary key columns of the associated table are used.
+  private final List<String> colNames_ = Lists.newArrayList();
+
+  // Map of primary key column names to the associated column definitions. Must be set
+  // before the call to analyze().
+  private Map<String, ColumnDef> pkColumnDefByName_;
+
+  // partitioning scheme type
+  private final Type type_;
+
+  // Only relevant for hash-based partitioning, -1 otherwise
+  private final int numBuckets_;
+
+  // List of range partitions specified in a range-based partitioning.
+  private List<RangePartition> rangePartitions_;
+
+  private KuduPartitionParam(Type t, List<String> colNames, int buckets,
+      List<RangePartition> partitions) {
+    type_ = t;
+    for (String name: colNames) colNames_.add(name.toLowerCase());
+    rangePartitions_ = partitions;
+    numBuckets_ = buckets;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    Preconditions.checkState(!colNames_.isEmpty());
+    Preconditions.checkNotNull(pkColumnDefByName_);
+    Preconditions.checkState(!pkColumnDefByName_.isEmpty());
+    // Validate that the columns specified in this partitioning are primary key columns.
+    for (String colName: colNames_) {
+      if (!pkColumnDefByName_.containsKey(colName)) {
+        throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " +
+            "column. Only key columns can be used in PARTITION BY.", colName, toSql()));
+      }
+    }
+    if (type_ == Type.RANGE) analyzeRangeParam(analyzer);
+  }
+
+  /**
+   * Analyzes a range-based partitioning. This function does not check for overlapping
+   * range partitions; these checks are performed by Kudu and an error is reported back
+   * to the user.
+   */
+  public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException {
+    List<ColumnDef> pkColDefs = Lists.newArrayListWithCapacity(colNames_.size());
+    for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName));
+    for (RangePartition rangePartition: rangePartitions_) {
+      rangePartition.analyze(analyzer, pkColDefs);
+    }
+  }
+
+  @Override
+  public String toSql() {
+    StringBuilder builder = new StringBuilder(type_.toString());
+    if (!colNames_.isEmpty()) {
+      builder.append(" (");
+      Joiner.on(", ").appendTo(builder, colNames_).append(")");
+    }
+    if (type_ == Type.HASH) {
+      builder.append(" INTO ");
+      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+      builder.append(numBuckets_).append(" BUCKETS");
+    } else {
+      builder.append(" (");
+      if (rangePartitions_ != null) {
+        List<String> partsSql = Lists.newArrayList();
+        for (RangePartition rangePartition: rangePartitions_) {
+          partsSql.add(rangePartition.toSql());
+        }
+        builder.append(Joiner.on(", ").join(partsSql));
+      } else {
+        builder.append("...");
+      }
+      builder.append(")");
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public String toString() { return toSql(); }
+
+  public TKuduPartitionParam toThrift() {
+    TKuduPartitionParam result = new TKuduPartitionParam();
+    // TODO: Add a validate() function to ensure the validity of distribute params.
+    if (type_ == Type.HASH) {
+      TKuduPartitionByHashParam hash = new TKuduPartitionByHashParam();
+      Preconditions.checkState(numBuckets_ != NO_BUCKETS);
+      hash.setNum_buckets(numBuckets_);
+      hash.setColumns(colNames_);
+      result.setBy_hash_param(hash);
+    } else {
+      Preconditions.checkState(type_ == Type.RANGE);
+      TKuduPartitionByRangeParam rangeParam = new TKuduPartitionByRangeParam();
+      rangeParam.setColumns(colNames_);
+      if (rangePartitions_ == null) {
+        result.setBy_range_param(rangeParam);
+        return result;
+      }
+      for (RangePartition rangePartition: rangePartitions_) {
+        rangeParam.addToRange_partitions(rangePartition.toThrift());
+      }
+      result.setBy_range_param(rangeParam);
+    }
+    return result;
+  }
+
+  void setPkColumnDefMap(Map<String, ColumnDef> pkColumnDefByName) {
+    pkColumnDefByName_ = pkColumnDefByName;
+  }
+
+  boolean hasColumnNames() { return !colNames_.isEmpty(); }
+  public List<String> getColumnNames() { return ImmutableList.copyOf(colNames_); }
+  void setColumnNames(Collection<String> colNames) {
+    Preconditions.checkState(colNames_.isEmpty());
+    colNames_.addAll(colNames);
+  }
+
+  public Type getType() { return type_; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
index 3e41bc4..e2640a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
+++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java
@@ -110,26 +110,26 @@ public class RangePartition implements ParseNode {
     throw new IllegalStateException("Not implemented");
   }
 
-  public void analyze(Analyzer analyzer, List<ColumnDef> distributionColDefs)
+  public void analyze(Analyzer analyzer, List<ColumnDef> partColDefs)
       throws AnalysisException {
-    analyzeBoundaryValues(lowerBound_, distributionColDefs, analyzer);
+    analyzeBoundaryValues(lowerBound_, partColDefs, analyzer);
     if (!isSingletonRange_) {
-      analyzeBoundaryValues(upperBound_, distributionColDefs, analyzer);
+      analyzeBoundaryValues(upperBound_, partColDefs, analyzer);
     }
   }
 
   private void analyzeBoundaryValues(List<Expr> boundaryValues,
-      List<ColumnDef> distributionColDefs, Analyzer analyzer) throws AnalysisException {
+      List<ColumnDef> partColDefs, Analyzer analyzer) throws AnalysisException {
     if (!boundaryValues.isEmpty()
-        && boundaryValues.size() != distributionColDefs.size()) {
+        && boundaryValues.size() != partColDefs.size()) {
       throw new AnalysisException(String.format("Number of specified range " +
-          "partition values is different than the number of distribution " +
+          "partition values is different than the number of partitioning " +
           "columns: (%d vs %d). Range partition: '%s'", boundaryValues.size(),
-          distributionColDefs.size(), toSql()));
+          partColDefs.size(), toSql()));
     }
     for (int i = 0; i < boundaryValues.size(); ++i) {
       LiteralExpr literal = analyzeBoundaryValue(boundaryValues.get(i),
-          distributionColDefs.get(i), analyzer);
+          partColDefs.get(i), analyzer);
       Preconditions.checkNotNull(literal);
       boundaryValues.set(i, literal);
     }
@@ -162,7 +162,7 @@ public class RangePartition implements ParseNode {
     if (!org.apache.impala.catalog.Type.isImplicitlyCastable(literalType, colType,
         true)) {
       throw new AnalysisException(String.format("Range partition value %s " +
-          "(type: %s) is not type compatible with distribution column '%s' (type: %s).",
+          "(type: %s) is not type compatible with partitioning column '%s' (type: %s).",
           literal.toSql(), literalType, pkColumn.getColName(), colType.toSql()));
     }
     if (!literalType.equals(colType)) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
index 4d3ed80..aef5732 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -22,35 +22,34 @@ import com.google.common.collect.Lists;
 import java.util.List;
 
 /**
- * Represents the PARTITION BY and DISTRIBUTED BY clauses of a DDL statement.
- * TODO: Reconsider this class when we add support for new range partitioning syntax (see
- * IMPALA-3724).
+ * Represents the PARTITION BY and PARTITIONED BY clauses of a DDL statement.
  */
 class TableDataLayout {
 
   private final List<ColumnDef> partitionColDefs_;
-  private final List<DistributeParam> distributeParams_;
+  private final List<KuduPartitionParam> kuduPartitionParams_;
 
   private TableDataLayout(List<ColumnDef> partitionColumnDefs,
-      List<DistributeParam> distributeParams) {
+      List<KuduPartitionParam> partitionParams) {
     partitionColDefs_ = partitionColumnDefs;
-    distributeParams_ = distributeParams;
+    kuduPartitionParams_ = partitionParams;
   }
 
   static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) {
     return new TableDataLayout(partitionColumnDefs,
-        Lists.<DistributeParam>newArrayList());
+        Lists.<KuduPartitionParam>newArrayList());
   }
 
-  static TableDataLayout createDistributedLayout(List<DistributeParam> distributeParams) {
-    return new TableDataLayout(Lists.<ColumnDef>newArrayList(), distributeParams);
+  static TableDataLayout createKuduPartitionedLayout(
+      List<KuduPartitionParam> partitionParams) {
+    return new TableDataLayout(Lists.<ColumnDef>newArrayList(), partitionParams);
   }
 
   static TableDataLayout createEmptyLayout() {
     return new TableDataLayout(Lists.<ColumnDef>newArrayList(),
-        Lists.<DistributeParam>newArrayList());
+        Lists.<KuduPartitionParam>newArrayList());
   }
 
   List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
-  List<DistributeParam> getDistributeParams() { return distributeParams_; }
+  List<KuduPartitionParam> getKuduPartitionParams() { return kuduPartitionParams_; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 1c16954..a40b4d2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -74,7 +74,7 @@ class TableDef {
   // If true, no errors are thrown if the table already exists.
   private final boolean ifNotExists_;
 
-  // Partitioned/distribute by parameters.
+  // Partitioning parameters.
   private final TableDataLayout dataLayout_;
 
   // True if analyze() has been called.
@@ -152,8 +152,8 @@ class TableDef {
   List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
   boolean isExternal() { return isExternal_; }
   boolean getIfNotExists() { return ifNotExists_; }
-  List<DistributeParam> getDistributeParams() {
-    return dataLayout_.getDistributeParams();
+  List<KuduPartitionParam> getKuduPartitionParams() {
+    return dataLayout_.getKuduPartitionParams();
   }
   void setOptions(Options options) {
     Preconditions.checkNotNull(options);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 4cd095c..be4ab6f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -197,7 +197,7 @@ public class ToSqlUtils {
 
     String storageHandlerClassName = table.getStorageHandlerClassName();
     List<String> primaryKeySql = Lists.newArrayList();
-    String kuduDistributeByParams = null;
+    String kuduPartitionByParams = null;
     if (table instanceof KuduTable) {
       KuduTable kuduTable = (KuduTable) table;
       // Kudu tables don't use LOCATION syntax
@@ -219,10 +219,10 @@ public class ToSqlUtils {
         primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
 
         List<String> paramsSql = Lists.newArrayList();
-        for (DistributeParam param: kuduTable.getDistributeBy()) {
+        for (KuduPartitionParam param: kuduTable.getPartitionBy()) {
           paramsSql.add(param.toSql());
         }
-        kuduDistributeByParams = Joiner.on(", ").join(paramsSql);
+        kuduPartitionByParams = Joiner.on(", ").join(paramsSql);
       } else {
         // We shouldn't output the columns for external tables
         colsSql = null;
@@ -230,7 +230,7 @@ public class ToSqlUtils {
     }
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
     return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
-        partitionColsSql, primaryKeySql, kuduDistributeByParams, properties,
+        partitionColsSql, primaryKeySql, kuduPartitionByParams, properties,
         serdeParameters, isExternal, false, rowFormat, format, compression,
         storageHandlerClassName, tableLocation);
   }
@@ -242,7 +242,7 @@ public class ToSqlUtils {
    */
   public static String getCreateTableSql(String dbName, String tableName,
       String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
-      List<String> primaryKeysSql, String kuduDistributeByParams,
+      List<String> primaryKeysSql, String kuduPartitionByParams,
       Map<String, String> tblProperties, Map<String, String> serdeParameters,
       boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
       HdfsFileFormat fileFormat, HdfsCompression compression, String storageHandlerClass,
@@ -271,8 +271,8 @@ public class ToSqlUtils {
           Joiner.on(", \n  ").join(partitionColumnsSql)));
     }
 
-    if (kuduDistributeByParams != null) {
-      sb.append("DISTRIBUTE BY " + kuduDistributeByParams + "\n");
+    if (kuduPartitionByParams != null) {
+      sb.append("PARTITION BY " + kuduPartitionByParams + "\n");
     }
 
     if (rowFormat != null && !rowFormat.isDefault()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index a7f72c3..9bbcbd5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -29,15 +29,15 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.impala.analysis.ColumnDef;
-import org.apache.impala.analysis.DistributeParam;
+import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.TDistributeByHashParam;
-import org.apache.impala.thrift.TDistributeByRangeParam;
-import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.thrift.TKuduPartitionByHashParam;
+import org.apache.impala.thrift.TKuduPartitionByRangeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TKuduTable;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
@@ -110,9 +110,9 @@ public class KuduTable extends Table {
   // Primary key column names.
   private final List<String> primaryKeyColumnNames_ = Lists.newArrayList();
 
-  // Distribution schemes of this Kudu table. Both range and hash-based distributions are
+  // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are
   // supported.
-  private final List<DistributeParam> distributeBy_ = Lists.newArrayList();
+  private final List<KuduPartitionParam> partitionBy_ = Lists.newArrayList();
 
   // Schema of the underlying Kudu table.
   private org.apache.kudu.Schema kuduSchema_;
@@ -148,36 +148,36 @@ public class KuduTable extends Table {
     return ImmutableList.copyOf(primaryKeyColumnNames_);
   }
 
-  public List<DistributeParam> getDistributeBy() {
-    return ImmutableList.copyOf(distributeBy_);
+  public List<KuduPartitionParam> getPartitionBy() {
+    return ImmutableList.copyOf(partitionBy_);
   }
 
   /**
-   * Returns the range-based distribution of this table if it exists, null otherwise.
+   * Returns the range-based partitioning of this table if it exists, null otherwise.
    */
-  private DistributeParam getRangeDistribution() {
-    for (DistributeParam distributeParam: distributeBy_) {
-      if (distributeParam.getType() == DistributeParam.Type.RANGE) {
-        return distributeParam;
+  private KuduPartitionParam getRangePartitioning() {
+    for (KuduPartitionParam partitionParam: partitionBy_) {
+      if (partitionParam.getType() == KuduPartitionParam.Type.RANGE) {
+        return partitionParam;
       }
     }
     return null;
   }
 
   /**
-   * Returns the column names of the table's range-based distribution or an empty
-   * list if the table doesn't have a range-based distribution.
+   * Returns the column names of the table's range-based partitioning or an empty
+   * list if the table doesn't have a range-based partitioning.
    */
-  public List<String> getRangeDistributionColNames() {
-    DistributeParam rangeDistribution = getRangeDistribution();
-    if (rangeDistribution == null) return Collections.<String>emptyList();
-    return rangeDistribution.getColumnNames();
+  public List<String> getRangePartitioningColNames() {
+    KuduPartitionParam rangePartitioning = getRangePartitioning();
+    if (rangePartitioning == null) return Collections.<String>emptyList();
+    return rangePartitioning.getColumnNames();
   }
 
   /**
    * Loads the metadata of a Kudu table.
    *
-   * Schema and distribution schemes are loaded directly from Kudu whereas column stats
+   * Schema and partitioning schemes are loaded directly from Kudu whereas column stats
    * are loaded from HMS. The function also updates the table schema in HMS in order to
    * propagate alterations made to the Kudu table to HMS.
    */
@@ -209,7 +209,7 @@ public class KuduTable extends Table {
     // Load metadata from Kudu and HMS
     try {
       loadSchema(kuduTable);
-      loadDistributeByParams(kuduTable);
+      loadPartitionByParams(kuduTable);
       loadAllColumnStats(msClient);
     } catch (ImpalaRuntimeException e) {
       throw new TableLoadingException("Error loading metadata for Kudu table " +
@@ -255,19 +255,19 @@ public class KuduTable extends Table {
     }
   }
 
-  private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) {
+  private void loadPartitionByParams(org.apache.kudu.client.KuduTable kuduTable) {
     Preconditions.checkNotNull(kuduTable);
     Schema tableSchema = kuduTable.getSchema();
     PartitionSchema partitionSchema = kuduTable.getPartitionSchema();
     Preconditions.checkState(!colsByPos_.isEmpty());
-    distributeBy_.clear();
+    partitionBy_.clear();
     for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) {
       List<String> columnNames = Lists.newArrayList();
       for (int colId: hashBucketSchema.getColumnIds()) {
         columnNames.add(getColumnNameById(tableSchema, colId));
       }
-      distributeBy_.add(
-          DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets()));
+      partitionBy_.add(KuduPartitionParam.createHashParam(columnNames,
+          hashBucketSchema.getNumBuckets()));
     }
     RangeSchema rangeSchema = partitionSchema.getRangeSchema();
     List<Integer> columnIds = rangeSchema.getColumns();
@@ -277,7 +277,7 @@ public class KuduTable extends Table {
     // We don't populate the split values because Kudu's API doesn't currently support
     // retrieving the split values for range partitions.
     // TODO: File a Kudu JIRA.
-    distributeBy_.add(DistributeParam.createRangeParam(columnNames, null));
+    partitionBy_.add(KuduPartitionParam.createRangeParam(columnNames, null));
   }
 
   /**
@@ -297,14 +297,14 @@ public class KuduTable extends Table {
    */
   public static KuduTable createCtasTarget(Db db,
       org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
-      List<String> primaryKeyColumnNames, List<DistributeParam> distributeParams) {
+      List<String> primaryKeyColumnNames, List<KuduPartitionParam> partitionParams) {
     KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     int pos = 0;
     for (ColumnDef colDef: columnDefs) {
       tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
     }
     tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames);
-    tmpTable.distributeBy_.addAll(distributeParams);
+    tmpTable.partitionBy_.addAll(partitionParams);
     return tmpTable;
   }
 
@@ -324,27 +324,28 @@ public class KuduTable extends Table {
     kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
     primaryKeyColumnNames_.clear();
     primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
-    loadDistributeByParamsFromThrift(tkudu.getDistribute_by());
+    loadPartitionByParamsFromThrift(tkudu.getPartition_by());
   }
 
-  private void loadDistributeByParamsFromThrift(List<TDistributeParam> params) {
-    distributeBy_.clear();
-    for (TDistributeParam param: params) {
+  private void loadPartitionByParamsFromThrift(List<TKuduPartitionParam> params) {
+    partitionBy_.clear();
+    for (TKuduPartitionParam param: params) {
       if (param.isSetBy_hash_param()) {
-        TDistributeByHashParam hashParam = param.getBy_hash_param();
-        distributeBy_.add(DistributeParam.createHashParam(
+        TKuduPartitionByHashParam hashParam = param.getBy_hash_param();
+        partitionBy_.add(KuduPartitionParam.createHashParam(
             hashParam.getColumns(), hashParam.getNum_buckets()));
       } else {
         Preconditions.checkState(param.isSetBy_range_param());
-        TDistributeByRangeParam rangeParam = param.getBy_range_param();
-        distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(),
+        TKuduPartitionByRangeParam rangeParam = param.getBy_range_param();
+        partitionBy_.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(),
             null));
       }
     }
   }
 
   @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
     TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE,
         getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName());
     desc.setKuduTable(getTKuduTable());
@@ -356,9 +357,9 @@ public class KuduTable extends Table {
     tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
     tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
     tbl.setTable_name(kuduTableName_);
-    Preconditions.checkNotNull(distributeBy_);
-    for (DistributeParam distributeParam: distributeBy_) {
-      tbl.addToDistribute_by(distributeParam.toThrift());
+    Preconditions.checkNotNull(partitionBy_);
+    for (KuduPartitionParam partitionParam: partitionBy_) {
+      tbl.addToPartition_by(partitionParam.toThrift());
     }
     return tbl;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 0f8f8fd..b0616c4 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -34,7 +34,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
-import org.apache.impala.thrift.TDistributeParam;
+import org.apache.impala.thrift.TKuduPartitionParam;
 import org.apache.impala.thrift.TRangePartition;
 import org.apache.impala.thrift.TRangePartitionOperationType;
 import org.apache.impala.util.KuduUtil;
@@ -138,22 +138,22 @@ public class KuduCatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       TCreateTableParams params, Schema schema) throws ImpalaRuntimeException {
     CreateTableOptions tableOpts = new CreateTableOptions();
-    // Set the distribution schemes
-    List<TDistributeParam> distributeParams = params.getDistribute_by();
-    if (distributeParams != null) {
+    // Set the partitioning schemes
+    List<TKuduPartitionParam> partitionParams = params.getPartition_by();
+    if (partitionParams != null) {
       boolean hasRangePartitioning = false;
-      for (TDistributeParam distParam: distributeParams) {
-        if (distParam.isSetBy_hash_param()) {
-          Preconditions.checkState(!distParam.isSetBy_range_param());
-          tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(),
-              distParam.getBy_hash_param().getNum_buckets());
+      for (TKuduPartitionParam partParam: partitionParams) {
+        if (partParam.isSetBy_hash_param()) {
+          Preconditions.checkState(!partParam.isSetBy_range_param());
+          tableOpts.addHashPartitions(partParam.getBy_hash_param().getColumns(),
+              partParam.getBy_hash_param().getNum_buckets());
         } else {
-          Preconditions.checkState(distParam.isSetBy_range_param());
+          Preconditions.checkState(partParam.isSetBy_range_param());
           hasRangePartitioning = true;
-          List<String> rangePartitionColumns = distParam.getBy_range_param().getColumns();
+          List<String> rangePartitionColumns = partParam.getBy_range_param().getColumns();
           tableOpts.setRangePartitionColumns(rangePartitionColumns);
           for (TRangePartition rangePartition:
-               distParam.getBy_range_param().getRange_partitions()) {
+               partParam.getBy_range_param().getRange_partitions()) {
             List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
                 getRangePartitionBounds(rangePartition, schema, rangePartitionColumns);
             Preconditions.checkState(rangeBounds.size() == 2);
@@ -164,7 +164,7 @@ public class KuduCatalogOpExecutor {
           }
         }
       }
-      // If no range-based distribution is specified in a CREATE TABLE statement, Kudu
+      // If no range-based partitioning is specified in a CREATE TABLE statement, Kudu
       // generates one by default that includes all the primary key columns. To prevent
       // this from happening, explicitly set the range partition columns to be
       // an empty list.
@@ -333,7 +333,7 @@ public class KuduCatalogOpExecutor {
   private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
       TRangePartition rangePartition, KuduTable tbl) throws ImpalaRuntimeException {
     return getRangePartitionBounds(rangePartition, tbl.getKuduSchema(),
-        tbl.getRangeDistributionColNames());
+        tbl.getRangePartitioningColNames());
   }
 
   /**
@@ -342,20 +342,20 @@ public class KuduCatalogOpExecutor {
    */
   private static List<Pair<PartialRow, RangePartitionBound>> getRangePartitionBounds(
       TRangePartition rangePartition, Schema schema,
-      List<String> rangeDistributionColNames) throws ImpalaRuntimeException {
+      List<String> rangePartitioningColNames) throws ImpalaRuntimeException {
     Preconditions.checkNotNull(schema);
-    Preconditions.checkState(!rangeDistributionColNames.isEmpty());
+    Preconditions.checkState(!rangePartitioningColNames.isEmpty());
     Preconditions.checkState(rangePartition.isSetLower_bound_values()
         || rangePartition.isSetUpper_bound_values());
     List<Pair<PartialRow, RangePartitionBound>> rangeBounds =
         Lists.newArrayListWithCapacity(2);
     Pair<PartialRow, RangePartitionBound> lowerBound =
-        KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames,
+        KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
         rangePartition.getLower_bound_values(),
         rangePartition.isIs_lower_bound_inclusive());
     rangeBounds.add(lowerBound);
     Pair<PartialRow, RangePartitionBound> upperBound =
-        KuduUtil.buildRangePartitionBound(schema, rangeDistributionColNames,
+        KuduUtil.buildRangePartitionBound(schema, rangePartitioningColNames,
         rangePartition.getUpper_bound_values(),
         rangePartition.isIs_upper_bound_inclusive());
     rangeBounds.add(upperBound);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 4aeb96d..d806a2f 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1440,16 +1440,16 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "Partition column name mismatch: tinyint_col != int_col");
 
     // CTAS into managed Kudu tables
-    AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets" +
+    AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets" +
         " stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
         "bigint_col, float_col, double_col, date_string_col, string_col " +
         "from functional.alltypestiny");
-    AnalyzesOk("create table t primary key (id) distribute by range (id) " +
+    AnalyzesOk("create table t primary key (id) partition by range (id) " +
         "(partition values < 10, partition 20 <= values < 30, partition value = 50) " +
         "stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
         "bigint_col, float_col, double_col, date_string_col, string_col " +
         "from functional.alltypestiny");
-    AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets, "+
+    AnalyzesOk("create table t primary key (id) partition by hash (id) into 3 buckets, "+
         "range (id) (partition values < 10, partition 10 <= values < 20, " +
         "partition value = 30) stored as kudu as select id, bool_col, tinyint_col, " +
         "smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, " +
@@ -1461,27 +1461,27 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "external Kudu tables.");
 
     // CTAS into Kudu tables with unsupported types
-    AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
         " stored as kudu as select id, timestamp_col from functional.alltypestiny",
         "Cannot create table 't': Type TIMESTAMP is not supported in Kudu");
-    AnalysisError("create table t primary key (cs) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (cs) partition by hash into 3 buckets" +
         " stored as kudu as select cs from functional.chars_tiny",
         "Cannot create table 't': Type CHAR(5) is not supported in Kudu");
-    AnalysisError("create table t primary key (vc) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (vc) partition by hash into 3 buckets" +
         " stored as kudu as select vc from functional.chars_tiny",
         "Cannot create table 't': Type VARCHAR(32) is not supported in Kudu");
-    AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
         " stored as kudu as select c1 as id from functional.decimal_tiny",
         "Cannot create table 't': Type DECIMAL(10,4) is not supported in Kudu");
-    AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
         " stored as kudu as select id, s from functional.complextypes_fileformat",
         "Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" +
         "Only scalar types are allowed in the select list.");
-    AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
         " stored as kudu as select id, m from functional.complextypes_fileformat",
         "Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" +
         "Only scalar types are allowed in the select list.");
-    AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
+    AnalysisError("create table t primary key (id) partition by hash into 3 buckets" +
         " stored as kudu as select id, a from functional.complextypes_fileformat",
         "Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
         "Only scalar types are allowed in the select list.");
@@ -1912,85 +1912,85 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   @Test
   public void TestCreateManagedKuduTable() {
     TestUtils.assumeKuduIsSupported();
-    // Test primary keys and distribute by clauses
-    AnalyzesOk("create table tab (x int primary key) distribute by hash(x) " +
+    // Test primary keys and partition by clauses
+    AnalyzesOk("create table tab (x int primary key) partition by hash(x) " +
         "into 8 buckets stored as kudu");
-    AnalyzesOk("create table tab (x int, primary key(x)) distribute by hash(x) " +
+    AnalyzesOk("create table tab (x int, primary key(x)) partition by hash(x) " +
         "into 8 buckets stored as kudu");
     AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " +
-        "distribute by hash(x, y) into 8 buckets stored as kudu");
+        "partition by hash(x, y) into 8 buckets stored as kudu");
     AnalyzesOk("create table tab (x int, y int, primary key (x)) " +
-        "distribute by hash(x) into 8 buckets stored as kudu");
+        "partition by hash(x) into 8 buckets stored as kudu");
     AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " +
-        "distribute by hash(y) into 8 buckets stored as kudu");
-    AnalyzesOk("create table tab (x int, y string, primary key (x)) distribute by " +
+        "partition by hash(y) into 8 buckets stored as kudu");
+    AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " +
         "hash (x) into 3 buckets, range (x) (partition values < 1, partition " +
         "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " +
         "stored as kudu");
-    AnalyzesOk("create table tab (x int, y int, primary key (x, y)) distribute by " +
+    AnalyzesOk("create table tab (x int, y int, primary key (x, y)) partition by " +
         "range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " +
         "partition value = (2003, 2)) stored as kudu");
     // Non-literal boundary values in range partitions
-    AnalyzesOk("create table tab (x int, y int, primary key (x)) distribute by " +
+    AnalyzesOk("create table tab (x int, y int, primary key (x)) partition by " +
         "range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " +
         "partition factorial(4) < values < factorial(5), " +
         "partition value = factorial(6)) stored as kudu");
-    AnalyzesOk("create table tab (x int, y int, primary key(x, y)) distribute by " +
+    AnalyzesOk("create table tab (x int, y int, primary key(x, y)) partition by " +
         "range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " +
         "partition value = (cast (30 as int), factorial(5))) stored as kudu");
-    AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+    AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values < x + 1) stored as kudu", "Only constant values are allowed " +
         "for range-partition bounds: x + 1");
-    AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+    AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= isnull(null, null)) stored as kudu", "Range partition " +
         "values cannot be NULL. Range partition: 'PARTITION VALUES <= " +
         "isnull(NULL, NULL)'");
-    AnalysisError("create table tab (x int primary key) distribute by range (x) " +
+    AnalysisError("create table tab (x int primary key) partition by range (x) " +
         "(partition values <= (select count(*) from functional.alltypestiny)) " +
         "stored as kudu", "Only constant values are allowed for range-partition " +
         "bounds: (SELECT count(*) FROM functional.alltypestiny)");
     // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each
     // bucket is partitioned into 4 tablets based on the range partitions of 'y'.
     AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " +
-        "distribute by hash(x) into 3 buckets, range(y) " +
+        "partition by hash(x) into 3 buckets, range(y) " +
         "(partition values < 'aa', partition 'aa' <= values < 'bb', " +
         "partition 'bb' <= values < 'cc', partition 'cc' <= values) " +
         "stored as kudu");
     // Key column in upper case
     AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
-        "distribute by hash (x) into 8 buckets stored as kudu");
+        "partition by hash (x) into 8 buckets stored as kudu");
     // Flexible Partitioning
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
-        "distribute by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " +
+        "partition by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " +
         "kudu");
-    // No columns specified in the DISTRIBUTE BY HASH clause
+    // No columns specified in the PARTITION BY HASH clause
     AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " +
-        "distribute by hash into 8 buckets stored as kudu");
+        "partition by hash into 8 buckets stored as kudu");
     // Distribute range data types are picked up during analysis and forwarded to Kudu.
     // Column names in distribute params should also be case-insensitive.
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" +
-        "distribute by hash (a, B, c) into 8 buckets, " +
+        "partition by hash (a, B, c) into 8 buckets, " +
         "range (A) (partition values < 1, partition 1 <= values < 2, " +
         "partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " +
         "stored as kudu");
-    // Allowing range distribution on a subset of the primary keys
+    // Allowing range partitioning on a subset of the primary keys
     AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " +
-        "primary key (id, name)) distribute by range (name) " +
+        "primary key (id, name)) partition by range (name) " +
         "(partition 'aa' < values <= 'bb') stored as kudu");
     // Null values in range partition values
     AnalysisError("create table tab (id int, name string, primary key(id, name)) " +
-        "distribute by hash (id) into 3 buckets, range (name) " +
+        "partition by hash (id) into 3 buckets, range (name) " +
         "(partition value = null, partition value = 1) stored as kudu",
         "Range partition values cannot be NULL. Range partition: 'PARTITION " +
         "VALUE = NULL'");
     // Primary key specified in tblproperties
-    AnalysisError(String.format("create table tab (x int) distribute by hash (x) " +
+    AnalysisError(String.format("create table tab (x int) partition by hash (x) " +
         "into 8 buckets stored as kudu tblproperties ('%s' = 'x')",
         KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " +
         "property");
     // Primary key column that doesn't exist
     AnalysisError("create table tab (x int, y int, primary key (z)) " +
-        "distribute by hash (x) into 8 buckets stored as kudu",
+        "partition by hash (x) into 8 buckets stored as kudu",
         "PRIMARY KEY column 'z' does not exist in the table");
     // Invalid composite primary key
     AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
@@ -2002,65 +2002,65 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
         "of the column definition.");
     // Specifying the same primary key column multiple times
-    AnalysisError("create table tab (x int, primary key (x, x)) distribute by hash (x) " +
+    AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " +
         "into 8 buckets stored as kudu",
         "Column 'x' is listed multiple times as a PRIMARY KEY.");
     // Number of range partition boundary values should be equal to the number of range
     // columns.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
-        "distribute by range(a) (partition value = (1, 2), " +
+        "partition by range(a) (partition value = (1, 2), " +
         "partition value = 3, partition value = 4) stored as kudu",
         "Number of specified range partition values is different than the number of " +
-        "distribution columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'");
+        "partitioning columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'");
     // Key ranges must match the column types.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
-        "distribute by hash (a, b, c) into 8 buckets, range (a) " +
+        "partition by hash (a, b, c) into 8 buckets, range (a) " +
         "(partition value = 1, partition value = 'abc', partition 3 <= values) " +
         "stored as kudu", "Range partition value 'abc' (type: STRING) is not type " +
-        "compatible with distribution column 'a' (type: INT).");
-    AnalysisError("create table tab (a tinyint primary key) distribute by range (a) " +
+        "compatible with partitioning column 'a' (type: INT).");
+    AnalysisError("create table tab (a tinyint primary key) partition by range (a) " +
         "(partition value = 128) stored as kudu", "Range partition value 128 " +
-        "(type: SMALLINT) is not type compatible with distribution column 'a' " +
+        "(type: SMALLINT) is not type compatible with partitioning column 'a' " +
         "(type: TINYINT)");
-    AnalysisError("create table tab (a smallint primary key) distribute by range (a) " +
+    AnalysisError("create table tab (a smallint primary key) partition by range (a) " +
         "(partition value = 32768) stored as kudu", "Range partition value 32768 " +
-        "(type: INT) is not type compatible with distribution column 'a' " +
+        "(type: INT) is not type compatible with partitioning column 'a' " +
         "(type: SMALLINT)");
-    AnalysisError("create table tab (a int primary key) distribute by range (a) " +
+    AnalysisError("create table tab (a int primary key) partition by range (a) " +
         "(partition value = 2147483648) stored as kudu", "Range partition value " +
-        "2147483648 (type: BIGINT) is not type compatible with distribution column 'a' " +
+        "2147483648 (type: BIGINT) is not type compatible with partitioning column 'a' " +
         "(type: INT)");
-    AnalysisError("create table tab (a bigint primary key) distribute by range (a) " +
+    AnalysisError("create table tab (a bigint primary key) partition by range (a) " +
         "(partition value = 9223372036854775808) stored as kudu", "Range partition " +
         "value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " +
-        "distribution column 'a' (type: BIGINT)");
+        "partitioning column 'a' (type: BIGINT)");
     // Test implicit casting/folding of partition values.
-    AnalyzesOk("create table tab (a int primary key) distribute by range (a) " +
+    AnalyzesOk("create table tab (a int primary key) partition by range (a) " +
         "(partition value = false, partition value = true) stored as kudu");
-    // Non-key column used in DISTRIBUTE BY
+    // Non-key column used in PARTITION BY
     AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " +
-        "distribute by range (b) (partition value = 'abc') stored as kudu",
+        "partition by range (b) (partition value = 'abc') stored as kudu",
         "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " +
-        "Only key columns can be used in DISTRIBUTE BY.");
+        "Only key columns can be used in PARTITION BY.");
     // No float range partition values
     AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
-        "distribute by hash (a, b, c) into 8 buckets, " +
+        "partition by hash (a, b, c) into 8 buckets, " +
         "range (a) (partition value = 1.2, partition value = 2) stored as kudu",
         "Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " +
-        "distribution column 'a' (type: INT).");
-    // Non-existing column used in DISTRIBUTE BY
+        "partitioning column 'a' (type: INT).");
+    // Non-existing column used in PARTITION BY
     AnalysisError("create table tab (a int, b int, primary key (a, b)) " +
-        "distribute by range(unknown_column) (partition value = 'abc') stored as kudu",
+        "partition by range(unknown_column) (partition value = 'abc') stored as kudu",
         "Column 'unknown_column' in 'RANGE (unknown_column) (PARTITION VALUE = 'abc')' " +
-        "is not a key column. Only key columns can be used in DISTRIBUTE BY");
+        "is not a key column. Only key columns can be used in PARTITION BY");
     // Kudu table name is specified in tblproperties
-    AnalyzesOk("create table tab (x int primary key) distribute by hash (x) " +
+    AnalyzesOk("create table tab (x int primary key) partition by hash (x) " +
         "into 8 buckets stored as kudu tblproperties ('kudu.table_name'='tab_1'," +
         "'kudu.num_tablet_replicas'='1'," +
         "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')");
     // No port is specified in kudu master address
     AnalyzesOk("create table tdata_no_port (id int primary key, name string, " +
-        "valf float, vali bigint) distribute by range(id) (partition values <= 10, " +
+        "valf float, vali bigint) partition by range(id) (partition values <= 10, " +
         "partition 10 < values <= 30, partition 30 < values) " +
         "stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')");
     // Not using the STORED AS KUDU syntax to specify a Kudu table
@@ -2078,12 +2078,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table tab (x int primary key) stored as kudu cached in " +
         "'testPool'", "A Kudu table cannot be cached in HDFS.");
     // LOCATION cannot be used with Kudu tables
-    AnalysisError("create table tab (a int primary key) distribute by hash (a) " +
+    AnalysisError("create table tab (a int primary key) partition by hash (a) " +
         "into 3 buckets stored as kudu location '/test-warehouse/'",
         "LOCATION cannot be specified for a Kudu table.");
-    // DISTRIBUTE BY is required for managed tables.
+    // PARTITION BY is required for managed tables.
     AnalysisError("create table tab (a int, primary key (a)) stored as kudu",
-        "Table distribution must be specified for managed Kudu tables.");
+        "Table partitioning must be specified for managed Kudu tables.");
     AnalysisError("create table tab (a int) stored as kudu",
         "A primary key is required for a Kudu table.");
     // Using ROW FORMAT with a Kudu table
@@ -2105,12 +2105,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
       // Unsupported type is PK and partition col
       String stmt = String.format("create table tab (x %s primary key) " +
-          "distribute by hash(x) into 3 buckets stored as kudu", t);
+          "partition by hash(x) into 3 buckets stored as kudu", t);
       AnalysisError(stmt, expectedError);
 
       // Unsupported type is not PK/partition col
       stmt = String.format("create table tab (x int primary key, y %s) " +
-          "distribute by hash(x) into 3 buckets stored as kudu", t);
+          "partition by hash(x) into 3 buckets stored as kudu", t);
       AnalysisError(stmt, expectedError);
     }
 
@@ -2125,7 +2125,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
             for (String block: blockSize) {
               AnalyzesOk(String.format("create table tab (x int primary key " +
                   "not null encoding %s compression %s %s %s, y int encoding %s " +
-                  "compression %s %s %s %s) distribute by hash (x) " +
+                  "compression %s %s %s %s) partition by hash (x) " +
                   "into 3 buckets stored as kudu", enc, comp, def, block, enc,
                   comp, def, nul, block));
             }
@@ -2136,23 +2136,23 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Primary key specified using the PRIMARY KEY clause
     AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
         "compression snappy block_size 1, y int null encoding rle compression lz4 " +
-        "default 1, primary key(x)) distribute by hash (x) into 3 buckets " +
+        "default 1, primary key(x)) partition by hash (x) into 3 buckets " +
         "stored as kudu");
     // Primary keys can't be null
     AnalysisError("create table tab (x int primary key null, y int not null) " +
-        "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
+        "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
         "cannot be nullable: x INT PRIMARY KEY NULL");
     AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
-        "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
+        "partition by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
         "cannot be nullable: y INT NULL");
     // Unsupported encoding value
     AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
-        "distribute by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " +
+        "partition by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " +
         "value 'INVALID_ENC'. Supported encoding values are: " +
         Joiner.on(", ").join(Encoding.values()));
     // Unsupported compression algorithm
     AnalysisError("create table tab (x int primary key, y int compression " +
-        "invalid_comp) distribute by hash (x) into 3 buckets stored as kudu",
+        "invalid_comp) partition by hash (x) into 3 buckets stored as kudu",
         "Unsupported compression algorithm 'INVALID_COMP'. Supported compression " +
         "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()));
     // Default values
@@ -2160,38 +2160,38 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "i3 int default 100, i4 bigint default 1000, vals string default 'test', " +
         "valf float default cast(1.2 as float), vald double default " +
         "cast(3.1452 as double), valb boolean default true, " +
-        "primary key (i1, i2, i3, i4, vals)) distribute by hash (i1) into 3 " +
+        "primary key (i1, i2, i3, i4, vals)) partition by hash (i1) into 3 " +
         "buckets stored as kudu");
     AnalyzesOk("create table tab (i int primary key default 1+1+1) " +
-        "distribute by hash (i) into 3 buckets stored as kudu");
+        "partition by hash (i) into 3 buckets stored as kudu");
     AnalyzesOk("create table tab (i int primary key default factorial(5)) " +
-        "distribute by hash (i) into 3 buckets stored as kudu");
+        "partition by hash (i) into 3 buckets stored as kudu");
     AnalyzesOk("create table tab (i int primary key, x int null default " +
-        "isnull(null, null)) distribute by hash (i) into 3 buckets stored as kudu");
+        "isnull(null, null)) partition by hash (i) into 3 buckets stored as kudu");
     // Invalid default values
     AnalysisError("create table tab (i int primary key default 'string_val') " +
-        "distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
+        "partition by hash (i) into 3 buckets stored as kudu", "Default value " +
         "'string_val' (type: STRING) is not compatible with column 'i' (type: INT).");
     AnalysisError("create table tab (i int primary key, x int default 1.1) " +
-        "distribute by hash (i) into 3 buckets stored as kudu",
+        "partition by hash (i) into 3 buckets stored as kudu",
         "Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " +
         "'x' (type: INT).");
     AnalysisError("create table tab (i tinyint primary key default 128) " +
-        "distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
+        "partition by hash (i) into 3 buckets stored as kudu", "Default value " +
         "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).");
     AnalysisError("create table tab (i int primary key default isnull(null, null)) " +
-        "distribute by hash (i) into 3 buckets stored as kudu", "Default value of " +
+        "partition by hash (i) into 3 buckets stored as kudu", "Default value of " +
         "NULL not allowed on non-nullable column: 'i'");
     AnalysisError("create table tab (i int primary key, x int not null " +
-        "default isnull(null, null)) distribute by hash (i) into 3 buckets " +
+        "default isnull(null, null)) partition by hash (i) into 3 buckets " +
         "stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
         "'x'");
     // Invalid block_size values
     AnalysisError("create table tab (i int primary key block_size 1.1) " +
-        "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
+        "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " +
         "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.");
     AnalysisError("create table tab (i int primary key block_size 'val') " +
-        "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
+        "partition by hash (i) into 3 buckets stored as kudu", "Invalid value " +
         "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.");
   }
 
@@ -2438,8 +2438,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // Kudu specific clauses used in an Avro table.
     AnalysisError("create table functional.new_table (i int) " +
-        "distribute by hash(i) into 3 buckets stored as avro",
-        "Only Kudu tables can use the DISTRIBUTE BY clause.");
+        "partition by hash(i) into 3 buckets stored as avro",
+        "Only Kudu tables can use the PARTITION BY clause.");
     AnalysisError("create table functional.new_table (i int primary key) " +
         "stored as avro", "Unsupported column options for file format 'AVRO': " +
         "'i INT PRIMARY KEY'");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cba93f1a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
index 431ac01..3d6bbec 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTest.java
@@ -911,7 +911,7 @@ public class AuthorizationTest {
 
     // IMPALA-4000: ALL privileges on SERVER are not required to create managed tables.
     AuthzOk("create table tpch.kudu_tbl (i int, j int, primary key (i))" +
-        " DISTRIBUTE BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " +
+        " PARTITION BY HASH (i) INTO 9 BUCKETS stored as kudu TBLPROPERTIES " +
         "('kudu.master_addresses'='127.0.0.1')");
 
     // User does not have permission to create table at the specified location..