You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:03 UTC

[16/33] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables

IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables

With this commit we simplify the syntax and handling of CREATE TABLE
statements for both managed and external Kudu tables.

Syntax example:
CREATE TABLE foo(a INT, b STRING, PRIMARY KEY (a, b))
DISTRIBUTE BY HASH (a) INTO 3 BUCKETS,
RANGE (b) SPLIT ROWS (('abc', 'def'))
STORED AS KUDU

Changes:
1) Remove the requirement to specify table properties such as key
   columns in tblproperties.
2) Read table schema (column definitions, primary keys, and distribution
   schemes) from Kudu instead of the HMS.
3) For external tables, the Kudu table is now required to exist at the
   time of creation in Impala.
4) Disallow table properties that could conflict with an existing
   table. Ex: key_columns cannot be specified.
5) Add KUDU as a file format.
6) Add a startup flag to impalad to specify the default Kudu master
   addresses. The flag is used as the default value for the table
   property kudu_master_addresses but it can still be overriden
   using TBLPROPERTIES.
7) Fix a post merge issue (IMPALA-3178) where DROP DATABASE CASCADE
   wasn't implemented for Kudu tables and silently ignored. The Kudu
   tables wouldn't be removed in Kudu.
8) Remove DDL delegates. There was only one functional delegate (for
   Kudu) the existence of the other delegate and the use of delegates in
   general has led to confusion. The Kudu delegate only exists to provide
   functionality missing from Hive.
9) Add PRIMARY KEY at the column and table level. This syntax is fairly
   standard. When used at the column level, only one column can be
   marked as a key. When used at the table level, multiple columns can
   be used as a key. Only Kudu tables are allowed to use PRIMARY KEY.
   The old "kudu.key_columns" table property is no longer accepted
   though it is still used internally. "PRIMARY" is now a keyword.
   The ident style declaration is used for "KEY" because it is also used
   for nested map types.
10) For managed tables, infer a Kudu table name if none was given.
   The table property "kudu.table_name" is optional for managed tables
   and is required for external tables. If for a managed table a Kudu
   table name is not provided, a table name will be generated based
   on the HMS database and table name.
11) Use Kudu master as the source of truth for table metadata instead
   of HMS when a table is loaded or refreshed. Table/column metadata
   are cached in the catalog and are stored in HMS in order to be
   able to use table and column statistics.

Change-Id: I7b9d51b2720ab57649abdb7d5c710ea04ff50dc1
Reviewed-on: http://gerrit.cloudera.org:8080/4414
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/041fa6d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/041fa6d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/041fa6d9

Branch: refs/heads/hadoop-next
Commit: 041fa6d946e1cbe309593c4a5515bef88e06cdb4
Parents: f8d48b8
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Wed Aug 31 16:56:47 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Oct 21 10:52:25 2016 +0000

----------------------------------------------------------------------
 be/src/service/frontend.cc                      |   9 +-
 bin/start-catalogd.sh                           |  30 +-
 bin/start-impala-cluster.py                     |  11 +-
 common/thrift/CatalogObjects.thrift             |  39 +-
 common/thrift/JniCatalog.thrift                 |  44 +-
 fe/src/main/cup/sql-parser.cup                  | 407 +++++++-------
 .../apache/impala/analysis/AnalysisUtils.java   |  43 ++
 .../org/apache/impala/analysis/ColumnDef.java   |  66 ++-
 .../analysis/CreateTableAsSelectStmt.java       |  41 +-
 .../impala/analysis/CreateTableDataSrcStmt.java |  30 +-
 .../analysis/CreateTableLikeFileStmt.java       |  29 +-
 .../impala/analysis/CreateTableLikeStmt.java    |  19 +-
 .../apache/impala/analysis/CreateTableStmt.java | 480 ++++++++--------
 .../apache/impala/analysis/DistributeParam.java | 211 ++++---
 .../org/apache/impala/analysis/ModifyStmt.java  |   4 +-
 .../apache/impala/analysis/TableDataLayout.java |  56 ++
 .../org/apache/impala/analysis/TableDef.java    | 316 +++++++++++
 .../org/apache/impala/analysis/ToSqlUtils.java  |  68 ++-
 .../java/org/apache/impala/catalog/Catalog.java |   4 +
 .../impala/catalog/CatalogServiceCatalog.java   |   7 +-
 .../main/java/org/apache/impala/catalog/Db.java |   5 +
 .../apache/impala/catalog/HdfsFileFormat.java   |  15 +-
 .../apache/impala/catalog/ImpaladCatalog.java   |   8 +-
 .../org/apache/impala/catalog/KuduTable.java    | 318 +++++++----
 .../java/org/apache/impala/catalog/Table.java   |   5 +
 .../org/apache/impala/catalog/TableLoader.java  |   1 -
 .../java/org/apache/impala/catalog/Type.java    |   9 +
 .../impala/catalog/delegates/DdlDelegate.java   |  75 ---
 .../catalog/delegates/KuduDdlDelegate.java      | 190 -------
 .../delegates/UnsupportedOpDelegate.java        |  35 --
 .../impala/planner/HdfsPartitionFilter.java     |   2 +-
 .../org/apache/impala/planner/KuduScanNode.java |   2 +-
 .../impala/service/CatalogOpExecutor.java       | 413 ++++++++------
 .../org/apache/impala/service/Frontend.java     |   9 +-
 .../org/apache/impala/service/JniCatalog.java   |   2 +-
 .../org/apache/impala/service/JniFrontend.java  |   5 +-
 .../impala/service/KuduCatalogOpExecutor.java   | 240 ++++++++
 .../java/org/apache/impala/util/KuduUtil.java   | 106 ++--
 fe/src/main/jflex/sql-scanner.flex              |   2 +
 .../apache/impala/analysis/AnalyzeDDLTest.java  | 409 ++++++++------
 .../org/apache/impala/analysis/ParserTest.java  |  32 +-
 .../org/apache/impala/service/JdbcTest.java     |   6 +-
 .../impala/testutil/ImpaladTestCatalog.java     |   6 +-
 infra/python/deps/download_requirements         |   2 +-
 infra/python/deps/requirements.txt              |   6 +-
 testdata/bin/generate-schema-statements.py      |  20 +-
 .../functional/functional_schema_template.sql   | 143 +----
 testdata/datasets/tpch/tpch_schema_template.sql | 120 +---
 .../queries/PlannerTest/kudu.test               |  28 +-
 .../queries/QueryTest/create_kudu.test          |  90 ---
 .../queries/QueryTest/kudu-scan-node.test       |  34 +-
 .../queries/QueryTest/kudu-show-create.test     |  16 -
 .../queries/QueryTest/kudu_alter.test           |  21 +-
 .../queries/QueryTest/kudu_create.test          | 105 ++++
 .../queries/QueryTest/kudu_crud.test            |  67 +--
 .../queries/QueryTest/kudu_partition_ddl.test   | 103 +---
 .../queries/QueryTest/kudu_stats.test           |  12 +-
 tests/common/__init__.py                        |   4 +-
 tests/common/kudu_test_suite.py                 | 148 +++++
 tests/conftest.py                               |  10 +-
 tests/custom_cluster/test_kudu.py               |  53 ++
 tests/metadata/test_ddl.py                      |   3 +-
 tests/metadata/test_show_create_table.py        |   5 -
 tests/query_test/test_kudu.py                   | 560 ++++++++++++++-----
 64 files changed, 3121 insertions(+), 2238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 855924f..ca8ecbb 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -58,11 +58,13 @@ DEFINE_string(authorized_proxy_user_config, "",
     "users. For example: hue=user1,user2;admin=*");
 DEFINE_string(authorized_proxy_user_config_delimiter, ",",
     "Specifies the delimiter used in authorized_proxy_user_config. ");
-
+DEFINE_string(kudu_master_hosts, "", "Specifies the default Kudu master(s). The given "
+    "value should be a comma separated list of hostnames or IP addresses; ports are "
+    "optional.");
 Frontend::Frontend() {
   JniMethodDescriptor methods[] = {
     {"<init>", "(ZLjava/lang/String;Ljava/lang/String;Ljava/lang/String;"
-        "Ljava/lang/String;IIZ)V", &fe_ctor_},
+        "Ljava/lang/String;IIZLjava/lang/String;)V", &fe_ctor_},
     {"createExecRequest", "([B)[B", &create_exec_request_id_},
     {"getExplainPlan", "([B)Ljava/lang/String;", &get_explain_plan_id_},
     {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_},
@@ -111,9 +113,10 @@ Frontend::Frontend() {
   // auth_to_local rules are read if --load_auth_to_local_rules is set to true
   // and impala is kerberized.
   jboolean auth_to_local = FLAGS_load_auth_to_local_rules && !FLAGS_principal.empty();
+  jstring kudu_master_hosts = jni_env->NewStringUTF(FLAGS_kudu_master_hosts.c_str());
   jobject fe = jni_env->NewObject(fe_class_, fe_ctor_, lazy, server_name,
       policy_file_path, sentry_config, auth_provider_class, FlagToTLogLevel(FLAGS_v),
-      FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local);
+      FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local, kudu_master_hosts);
   EXIT_IF_EXC(jni_env);
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/bin/start-catalogd.sh
----------------------------------------------------------------------
diff --git a/bin/start-catalogd.sh b/bin/start-catalogd.sh
index 05eb8bd..1046c82 100755
--- a/bin/start-catalogd.sh
+++ b/bin/start-catalogd.sh
@@ -34,29 +34,17 @@ JVM_ARGS=""
 for ARG in $*
 do
   case "$ARG" in
-    -build_type=debug)
-      BUILD_TYPE=debug
-      ;;
-    -build_type=release)
-      BUILD_TYPE=release
-      ;;
-    -build_type=latest)
-      ;;
+    -build_type=debug) BUILD_TYPE=debug;;
+    -build_type=release) BUILD_TYPE=release;;
+    -build_type=latest) ;;
     -build_type=*)
       echo "Invalid build type. Valid values are: debug, release"
-      exit 1
-      ;;
-    -jvm_debug_port=*)
-      JVM_DEBUG_PORT="${ARG#*=}"
-      ;;
-    -jvm_suspend)
-      JVM_SUSPEND="y"
-      ;;
-    -jvm_args=*)
-      JVM_ARGS="${ARG#*=}"
-      ;;
-    *)
-      CATALOGD_ARGS="${CATALOGD_ARGS} ${ARG}"
+      exit 1;;
+    -jvm_debug_port=*) JVM_DEBUG_PORT="${ARG#*=}";;
+    -jvm_suspend) JVM_SUSPEND="y";;
+    -jvm_args=*) JVM_ARGS="${ARG#*=}";;
+    -kudu_masters=*) CATALOGD_ARGS+=" ${ARG#*=}";;
+    *) CATALOGD_ARGS="${CATALOGD_ARGS} ${ARG}";;
   esac
 done
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 3ea338a..df7dea6 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -27,6 +27,7 @@ from getpass import getuser
 from time import sleep, time
 from optparse import OptionParser
 from testdata.common import cgroups
+from tests.common import KUDU_MASTER_HOSTS
 
 
 DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
@@ -57,7 +58,8 @@ parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
                   help="Restarts only the impalad processes")
 parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
                   help="Start all Impala backends and state store in a single process.")
-parser.add_option("--log_dir", dest="log_dir", default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
+parser.add_option("--log_dir", dest="log_dir",
+                  default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
                   help="Directory to store output logs to.")
 parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
                   help='Max number of log files before rotation occurs.')
@@ -70,6 +72,9 @@ parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
 parser.add_option("--jvm_args", dest="jvm_args", default="",
                   help="Additional arguments to pass to the JVM(s) during startup.")
+parser.add_option("--kudu_masters", default=KUDU_MASTER_HOSTS,
+                  help="The host name or address of the Kudu master. Multiple masters "
+                      "can be specified using a comma separated list.")
 
 options, args = parser.parse_args()
 
@@ -193,7 +198,6 @@ def build_impalad_port_args(instance_num):
                           BASE_WEBSERVER_PORT + instance_num)
 
 def build_impalad_logging_args(instance_num, service_name):
-  log_file_path = os.path.join(options.log_dir, "%s.INFO" % service_name)
   return BE_LOGGING_ARGS % (service_name, options.log_dir, options.log_level,
                             options.max_log_files)
 
@@ -233,6 +237,9 @@ def start_impalad_instances(cluster_size):
           (mem_limit,  # Goes first so --impalad_args will override it.
            build_impalad_logging_args(i, service_name), build_jvm_args(i),
            build_impalad_port_args(i), param_args)
+    if options.kudu_masters:
+      # Must be prepended, otherwise the java options interfere.
+      args = "-kudu_master_hosts %s %s" % (options.kudu_masters, args)
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 5378988..78aa19d 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -49,14 +49,20 @@ enum TTableType {
   KUDU_TABLE,
 }
 
+// TODO: Separate the storage engines (e.g. Kudu) from the file formats.
+// TODO: Make the names consistent with the file format keywords specified in
+// the parser.
 enum THdfsFileFormat {
   TEXT,
   RC_FILE,
   SEQUENCE_FILE,
   AVRO,
-  PARQUET
+  PARQUET,
+  KUDU
 }
 
+// TODO: Since compression is also enabled for Kudu columns, we should
+// rename this enum to not be Hdfs specific.
 enum THdfsCompression {
   NONE,
   DEFAULT,
@@ -337,6 +343,34 @@ struct TDataSourceTable {
   2: required string init_string
 }
 
+// Parameters needed for hash distribution
+struct TDistributeByHashParam {
+  1: required list<string> columns
+  2: required i32 num_buckets
+}
+
+struct TRangeLiteral {
+  1: optional i64 int_literal
+  2: optional string string_literal
+}
+
+struct TRangeLiteralList {
+  // TODO: Replace TRangeLiteral with Exprs.TExpr.
+  1: required list<TRangeLiteral> values
+}
+
+// A range distribution is identified by a list of columns and a series of split rows.
+struct TDistributeByRangeParam {
+  1: required list<string> columns
+  2: optional list<TRangeLiteralList> split_rows;
+}
+
+// Parameters for the DISTRIBUTE BY clause.
+struct TDistributeParam {
+  1: optional TDistributeByHashParam by_hash_param;
+  2: optional TDistributeByRangeParam by_range_param;
+}
+
 // Represents a Kudu table
 struct TKuduTable {
   1: required string table_name
@@ -346,6 +380,9 @@ struct TKuduTable {
 
   // Name of the key columns
   3: required list<string> key_columns
+
+  // Distribution schemes
+  4: required list<TDistributeParam> distribute_by
 }
 
 // Represents a table or view.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index b97e458..8ed9fe3 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -155,45 +155,6 @@ struct THdfsCachingOp {
   3: optional i16 replication
 }
 
-// Enum listing all possible DISTRIBUTE BY types
-enum TDistributeType {
-  HASH,
-  RANGE,
-}
-
-// Parameters needed for hash distribution
-struct TDistributeByHashParam {
-  1: required list<string> columns
-  2: required i32 num_buckets
-}
-
-struct TRangeLiteral {
-  1: optional i64 int_literal
-  2: optional double float_literal
-  3: optional string string_literal
-  4: optional bool bool_literal
-}
-
-struct TRangeLiteralList {
-  1: required list<TRangeLiteral> values
-}
-
-// A range distribution is identified by a list of columns and a series of split rows.
-struct TDistributeByRangeParam {
-  1: required list<string> columns
-  2: required list<TRangeLiteralList> split_rows;
-}
-
-// Parameters for the DISTRIBUTE BY clause. The actual distribution is identified by
-// the type parameter.
-struct TDistributeParam {
-  // Set if type is set to HASH
-  1: optional TDistributeByHashParam by_hash_param;
-
-  // Set if type is set to RANGE
-  2: optional TDistributeByRangeParam by_range_param;
-}
-
 // Parameters for ALTER TABLE rename commands
 struct TAlterTableOrViewRenameParams {
   // The new table name
@@ -434,7 +395,10 @@ struct TCreateTableParams {
 
   // If set, the table is automatically distributed according to this parameter.
   // Kudu-only.
-  14: optional list<TDistributeParam> distribute_by;
+  14: optional list<CatalogObjects.TDistributeParam> distribute_by
+
+  // Primary key column names (Kudu-only)
+  15: optional list<string> primary_key_column_names;
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 6fc76f9..7554b5a 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -17,32 +17,33 @@
 
 package org.apache.impala.analysis;
 
-import org.apache.impala.catalog.Type;
-import org.apache.impala.catalog.ScalarType;
+import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java_cup.runtime.Symbol;
+
+import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.UnionStmt.Qualifier;
+import org.apache.impala.analysis.UnionStmt.UnionOperand;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.MapType;
-import org.apache.impala.catalog.StructType;
-import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.RowFormat;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.analysis.ColumnDef;
-import org.apache.impala.analysis.UnionStmt.UnionOperand;
-import org.apache.impala.analysis.UnionStmt.Qualifier;
 import org.apache.impala.thrift.TCatalogObjectType;
-import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.thrift.TDescribeOutputStyle;
+import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.impala.thrift.TTablePropertyType;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java_cup.runtime.Symbol;
-import com.google.common.collect.Lists;
 
 parser code {:
   private Symbol errorToken_;
@@ -248,11 +249,11 @@ terminal
   KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF,
   KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
   KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN,
-  KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP,
+  KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP,
   KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER,
   KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
-  KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRODUCED, KW_PURGE,
-  KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE,
+  KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
+  KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME, KW_REPLACE,
   KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE,
   KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE,
   KW_SERDEPROPERTIES, KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_SPLIT, KW_STORED,
@@ -318,7 +319,7 @@ nonterminal SelectListItem star_expr;
 nonterminal Expr expr, non_pred_expr, arithmetic_expr, timestamp_arithmetic_expr;
 nonterminal ArrayList<Expr> expr_list;
 nonterminal String alias_clause;
-nonterminal ArrayList<String> ident_list;
+nonterminal ArrayList<String> ident_list, primary_keys, opt_primary_keys;
 nonterminal ArrayList<String> opt_ident_list;
 nonterminal TableName table_name;
 nonterminal FunctionName function_name;
@@ -383,8 +384,10 @@ nonterminal DropTableOrViewStmt drop_tbl_or_view_stmt;
 nonterminal CreateDbStmt create_db_stmt;
 nonterminal CreateTableAsSelectStmt create_tbl_as_select_stmt;
 nonterminal CreateTableLikeStmt create_tbl_like_stmt;
-nonterminal CreateTableLikeFileStmt create_tbl_like_file_stmt;
-nonterminal CreateTableStmt create_unpartitioned_tbl_stmt, create_partitioned_tbl_stmt;
+nonterminal CreateTableStmt create_tbl_stmt;
+nonterminal TableDef tbl_def_without_col_defs, tbl_def_with_col_defs;
+nonterminal TableDataLayout opt_tbl_data_layout, distributed_data_layout;
+nonterminal TableDef.Options tbl_options;
 nonterminal CreateViewStmt create_view_stmt;
 nonterminal CreateDataSrcStmt create_data_src_stmt;
 nonterminal DropDataSrcStmt drop_data_src_stmt;
@@ -393,14 +396,13 @@ nonterminal StructField struct_field_def;
 nonterminal String ident_or_keyword;
 nonterminal DistributeParam distribute_hash_param;
 nonterminal ArrayList<DistributeParam> distribute_hash_param_list;
-nonterminal ArrayList<DistributeParam> opt_distribute_param_list;
 nonterminal ArrayList<DistributeParam> distribute_param_list;
 nonterminal DistributeParam distribute_range_param;
-nonterminal ArrayList<ArrayList<LiteralExpr>> split_row_list;
-nonterminal ArrayList<LiteralExpr> literal_list;
+nonterminal List<List<LiteralExpr>> split_row_list;
+nonterminal List<LiteralExpr> literal_list;
 nonterminal ColumnDef column_def, view_column_def;
-nonterminal ArrayList<ColumnDef> column_def_list, view_column_def_list;
-nonterminal ArrayList<ColumnDef> partition_column_defs, view_column_defs;
+nonterminal ArrayList<ColumnDef> column_def_list, partition_column_defs,
+  view_column_def_list, view_column_defs;
 nonterminal ArrayList<StructField> struct_field_def_list;
 // Options for DDL commands - CREATE/DROP/ALTER
 nonterminal HdfsCachingOp cache_op_val;
@@ -413,6 +415,7 @@ nonterminal THdfsFileFormat file_format_val;
 nonterminal THdfsFileFormat file_format_create_table_val;
 nonterminal Boolean if_exists_val;
 nonterminal Boolean if_not_exists_val;
+nonterminal Boolean is_primary_key_val;
 nonterminal Boolean replace_existing_cols_val;
 nonterminal HdfsUri location_val;
 nonterminal RowFormat row_format_val;
@@ -451,11 +454,12 @@ nonterminal Boolean opt_kw_role;
 // To avoid creating common keywords such as 'SERVER' or 'SOURCES' we treat them as
 // identifiers rather than keywords. Throws a parse exception if the identifier does not
 // match the expected string.
+nonterminal key_ident;
+nonterminal Boolean option_ident;
+nonterminal Boolean server_ident;
 nonterminal Boolean source_ident;
 nonterminal Boolean sources_ident;
-nonterminal Boolean server_ident;
 nonterminal Boolean uri_ident;
-nonterminal Boolean option_ident;
 
 // For Create/Drop/Show function ddl
 nonterminal FunctionArgs function_def_args;
@@ -550,11 +554,7 @@ stmt ::=
   {: RESULT = create_tbl_as_select; :}
   | create_tbl_like_stmt:create_tbl_like
   {: RESULT = create_tbl_like; :}
-  | create_tbl_like_file_stmt:create_tbl_like_file
-  {: RESULT = create_tbl_like_file; :}
-  | create_unpartitioned_tbl_stmt:create_tbl
-  {: RESULT = create_tbl; :}
-  | create_partitioned_tbl_stmt:create_tbl
+  | create_tbl_stmt:create_tbl
   {: RESULT = create_tbl; :}
   | create_view_stmt:create_view
   {: RESULT = create_view; :}
@@ -940,134 +940,177 @@ create_db_stmt ::=
   {: RESULT = new CreateDbStmt(db_name, comment, location, if_not_exists); :}
   ;
 
-create_tbl_like_stmt ::=
-  KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table KW_LIKE table_name:other_table comment_val:comment
-  KW_STORED KW_AS file_format_val:file_format location_val:location
+create_tbl_as_select_stmt ::=
+  tbl_def_without_col_defs:tbl_def
+  tbl_options:options
+  KW_AS query_stmt:select_stmt
+  {:
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def), select_stmt, null);
+  :}
+  | tbl_def_without_col_defs:tbl_def
+    // An optional clause cannot be used directly below because it would conflict with
+    // the first rule in "create_tbl_stmt".
+    primary_keys:primary_keys
+    distributed_data_layout:distribute_params
+    tbl_options:options
+    KW_AS query_stmt:select_stmt
+  {:
+    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
+    tbl_def.getDistributeParams().addAll(distribute_params.getDistributeParams());
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def), select_stmt, null);
+  :}
+  | tbl_def_without_col_defs:tbl_def
+    KW_PARTITIONED KW_BY LPAREN ident_list:partition_cols RPAREN
+    tbl_options:options
+    KW_AS query_stmt:select_stmt
+  {:
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableAsSelectStmt(new CreateTableStmt(tbl_def),
+        select_stmt, partition_cols);
+  :}
+  ;
+
+create_tbl_stmt ::=
+  tbl_def_without_col_defs:tbl_def
+  tbl_options:options
+  {:
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableStmt(tbl_def);
+  :}
+  | tbl_def_without_col_defs:tbl_def
+    // If "opt_tbl_data_layout" were used instead so that this rule could be combined with
+    // the rule above, there would be a conflict with the first rule in
+    // "create_tbl_as_select_stmt".
+    partition_column_defs:partition_column_defs
+    tbl_options:options
+  {:
+    tbl_def.setOptions(options);
+    CreateTableStmt create_tbl_stmt = new CreateTableStmt(tbl_def);
+    create_tbl_stmt.getPartitionColumnDefs().addAll(partition_column_defs);
+    RESULT = create_tbl_stmt;
+  :}
+  | tbl_def_with_col_defs:tbl_def
+    opt_tbl_data_layout:data_layout
+    tbl_options:options
+  {:
+    tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
+    tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableStmt(tbl_def);
+  :}
+  | tbl_def_with_col_defs:tbl_def
+    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name
+    opt_init_string_val:init_string
+    comment_val:comment
   {:
-    RESULT = new CreateTableLikeStmt(table, other_table, external, comment,
-        file_format, location, if_not_exists);
+    // Need external_val in the grammar to avoid shift/reduce conflict with other
+    // CREATE TABLE statements.
+    if (tbl_def.isExternal()) {
+      parser.parseError("external", SqlParserSymbols.KW_EXTERNAL);
+    }
+    tbl_def.setOptions(new TableDef.Options(comment));
+    RESULT = new CreateTableDataSrcStmt(new CreateTableStmt(tbl_def),
+        data_src_name, init_string);
   :}
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-    table_name:table KW_LIKE table_name:other_table comment_val:comment
-    location_val:location
+  | tbl_def_without_col_defs:tbl_def
+    KW_LIKE file_format_val:schema_file_format
+    STRING_LITERAL:schema_location
+    opt_tbl_data_layout:data_layout
+    tbl_options:options
   {:
-    RESULT = new CreateTableLikeStmt(table, other_table, external, comment,
-        null, location, if_not_exists);
+    tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
+    tbl_def.getDistributeParams().addAll(data_layout.getDistributeParams());
+    tbl_def.setOptions(options);
+    RESULT = new CreateTableLikeFileStmt(new CreateTableStmt(tbl_def),
+        schema_file_format, new HdfsUri(schema_location));
   :}
   ;
 
-create_tbl_like_file_stmt ::=
-  KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table KW_LIKE file_format_val:schema_file_format
-  STRING_LITERAL:schema_location partition_column_defs:partition_col_defs
-  comment_val:comment row_format_val:row_format serde_properties:serde_props
-  file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
-  tbl_properties:tbl_props
+// The form of CREATE TABLE below should logically be grouped with the forms above but
+// 'create_tbl_stmt' must return a CreateTableStmt instance and CreateTableLikeFileStmt
+// class doesn't inherit from CreateTableStmt.
+// TODO: Refactor the CREATE TABLE statements to improve the grammar and the way we
+// handle table options.
+create_tbl_like_stmt ::=
+  tbl_def_without_col_defs:tbl_def
+  KW_LIKE table_name:other_table
+  comment_val:comment
+  file_format_create_table_val:file_format location_val:location
   {:
-    RESULT = new CreateTableLikeFileStmt(table, schema_file_format,
-        new HdfsUri(schema_location), partition_col_defs, external, comment, row_format,
-        file_format, location, cache_op, if_not_exists, tbl_props, serde_props);
+    RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table,
+        tbl_def.isExternal(), comment, file_format, location, tbl_def.getIfNotExists());
   :}
   ;
 
-create_tbl_as_select_stmt ::=
-  KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table comment_val:comment row_format_val:row_format
-  serde_properties:serde_props file_format_create_table_val:file_format
-  location_val:location cache_op_val:cache_op distribute_param_list:distribute
-  tbl_properties:tbl_props
-  KW_AS query_stmt:query
-  {:
-    // Initialize with empty List of columns and partition columns. The
-    // columns will be added from the query statement during analysis
-    CreateTableStmt create_stmt = new CreateTableStmt(table, new ArrayList<ColumnDef>(),
-        new ArrayList<ColumnDef>(), external, comment, row_format, file_format, location,
-        cache_op, if_not_exists, tbl_props, serde_props, distribute);
-    RESULT = new CreateTableAsSelectStmt(create_stmt, query, null);
-  :}
-  // Create partitioned tables with CTAS statement. We need a separate production
-  // here, combining both into one causes an unresolvable reduce/reduce
-  // conflicts due to the optional clauses.
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table KW_PARTITIONED KW_BY LPAREN ident_list:partition_cols RPAREN
-  comment_val:comment row_format_val:row_format
-  serde_properties:serde_props file_format_create_table_val:file_format
-  location_val:location cache_op_val:cache_op tbl_properties:tbl_props
-  KW_AS query_stmt:query
-  {:
-    // Initialize with empty list of columns. The columns will be added by the query
-    // statement during analysis.
-    CreateTableStmt create_stmt = new CreateTableStmt(table,
-        new ArrayList<ColumnDef>(), new ArrayList<ColumnDef>(), external,
-        comment, row_format, file_format, location, cache_op, if_not_exists, tbl_props,
-        serde_props, null);
-    RESULT = new CreateTableAsSelectStmt(create_stmt, query, partition_cols);
-  :}
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-    table_name:table comment_val:comment row_format_val:row_format
-    serde_properties:serde_props file_format_create_table_val:file_format
-    location_val:location cache_op_val:cache_op tbl_properties:tbl_props
-    KW_AS query_stmt:query
-  {:
-    // Initialize with empty list of columns and partition columns. The
-    // columns will be added from the query statement during analysis
-    CreateTableStmt create_stmt = new CreateTableStmt(table, new ArrayList<ColumnDef>(),
-      new ArrayList<ColumnDef>(), external, comment, row_format, file_format, location,
-      cache_op, if_not_exists, tbl_props, serde_props,null);
-    RESULT = new CreateTableAsSelectStmt(create_stmt, query, null);
-  :}
-  ;
-
-// Create unpartitioned tables with and without column definitions.
-// We cannot coalesce this production with create_partitioned_tbl_stmt because
-// that results in an unresolvable reduce/reduce conflict due to the many
-// optional clauses (not clear which rule to reduce on 'empty').
-// TODO: Clean up by consolidating everything after the column defs and
-// partition clause into a CreateTableParams.
-create_unpartitioned_tbl_stmt ::=
+// Used for creating tables where the schema is inferred externally, e.g., from an Avro
+// schema, Kudu table or query statement.
+tbl_def_without_col_defs ::=
   KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table LPAREN column_def_list:col_defs RPAREN comment_val:comment
-  row_format_val:row_format serde_properties:serde_props
-  file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
-  opt_distribute_param_list:distribute
-  tbl_properties:tbl_props
+  table_name:table
+  {: RESULT = new TableDef(table, external, if_not_exists); :}
+  ;
+
+tbl_def_with_col_defs ::=
+  tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list RPAREN
   {:
-    RESULT = new CreateTableStmt(table, col_defs, new ArrayList<ColumnDef>(), external,
-        comment, row_format, file_format, location, cache_op, if_not_exists, tbl_props,
-        serde_props, distribute);
+    tbl_def.getColumnDefs().addAll(list);
+    RESULT = tbl_def;
   :}
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-    table_name:table comment_val:comment row_format_val:row_format
-    serde_properties:serde_props file_format_create_table_val:file_format
-    location_val:location cache_op_val:cache_op tbl_properties:tbl_props
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA opt_primary_keys:primary_keys RPAREN
   {:
-    RESULT = new CreateTableStmt(table, new ArrayList<ColumnDef>(),
-        new ArrayList<ColumnDef>(), external, comment, row_format, file_format,
-        location, cache_op, if_not_exists, tbl_props, serde_props, null);
+    tbl_def.getColumnDefs().addAll(list);
+    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
+    RESULT = tbl_def;
   :}
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-    table_name:table LPAREN column_def_list:col_defs RPAREN
-    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name
-    opt_init_string_val:init_string comment_val:comment
+  ;
+
+opt_primary_keys ::=
+  primary_keys:col_names
+  {: RESULT = col_names; :}
+  | /* empty */
+  {: RESULT = null; :}
+  ;
+
+primary_keys ::=
+  KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN
+  {: RESULT = col_names; :}
+  ;
+
+tbl_options ::=
+  comment_val:comment row_format_val:row_format serde_properties:serde_props
+  file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
+  tbl_properties:tbl_props
   {:
-    // Need external_val in the grammar to avoid shift/reduce conflict with other
-    // CREATE TABLE statements.
-    if (external) parser.parseError("external", SqlParserSymbols.KW_EXTERNAL);
-    RESULT = new CreateTableDataSrcStmt(table, col_defs, data_src_name, init_string,
-        comment, if_not_exists);
+    CreateTableStmt.unescapeProperties(serde_props);
+    CreateTableStmt.unescapeProperties(tbl_props);
+    RESULT = new TableDef.Options(comment, row_format, serde_props, file_format,
+        location, cache_op, tbl_props);
   :}
   ;
 
-// The DISTRIBUTE clause contains any number of HASH() clauses followed by exactly zero
-// or one RANGE clause
-opt_distribute_param_list ::=
-  distribute_param_list:list
-  {: RESULT = list; :}
+opt_tbl_data_layout ::=
+  partition_column_defs:partition_column_defs
+  {: RESULT = TableDataLayout.createPartitionedLayout(partition_column_defs); :}
+  | distributed_data_layout:data_layout
+  {: RESULT = data_layout; :}
+  ;
+
+distributed_data_layout ::=
+  distribute_param_list:distribute_params
+  {: RESULT = TableDataLayout.createDistributedLayout(distribute_params); :}
   | /* empty */
-  {: RESULT = null; :}
+  {: RESULT = TableDataLayout.createEmptyLayout(); :}
   ;
 
+partition_column_defs ::=
+  KW_PARTITIONED KW_BY LPAREN column_def_list:col_defs RPAREN
+  {: RESULT = col_defs; :}
+  ;
+
+// The DISTRIBUTE clause contains any number of HASH() clauses followed by exactly zero
+// or one RANGE clauses
 distribute_param_list ::=
   KW_DISTRIBUTE KW_BY distribute_hash_param_list:list
   {: RESULT = list; :}
@@ -1095,9 +1138,12 @@ distribute_hash_param_list ::=
 distribute_hash_param ::=
   KW_HASH LPAREN ident_list:cols RPAREN KW_INTO
     INTEGER_LITERAL:buckets KW_BUCKETS
-  {: RESULT = DistributeParam.createHashParam(cols, buckets); :}
+  {: RESULT = DistributeParam.createHashParam(cols, buckets.intValue()); :}
   | KW_HASH KW_INTO INTEGER_LITERAL:buckets KW_BUCKETS
-  {: RESULT = DistributeParam.createHashParam(null, buckets); :}
+  {:
+    RESULT = DistributeParam.createHashParam(Lists.<String>newArrayList(),
+        buckets.intValue());
+  :}
   ;
 
 // The column list for a RANGE clause is optional.
@@ -1106,12 +1152,12 @@ distribute_range_param ::=
   LPAREN split_row_list:list RPAREN
   {: RESULT = DistributeParam.createRangeParam(cols, list); :}
   | KW_RANGE KW_SPLIT KW_ROWS LPAREN split_row_list:list RPAREN
-  {: RESULT = DistributeParam.createRangeParam(null, list); :}
+  {: RESULT = DistributeParam.createRangeParam(Lists.<String>newArrayList(), list); :}
   ;
 
 split_row_list ::=
   LPAREN literal_list:l RPAREN
-  {: RESULT = Lists.<ArrayList<LiteralExpr>>newArrayList(l); :}
+  {: RESULT = Lists.<List<LiteralExpr>>newArrayList(l); :}
   | split_row_list:list COMMA LPAREN literal_list:l RPAREN
   {:
     list.add(l);
@@ -1129,34 +1175,6 @@ literal_list ::=
   :}
   ;
 
-// Create partitioned tables with and without column definitions.
-// TODO: Clean up by consolidating everything after the column defs and
-// partition clause into a CreateTableParams.
-create_partitioned_tbl_stmt ::=
-  KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-  table_name:table LPAREN column_def_list:col_defs RPAREN KW_PARTITIONED KW_BY
-  LPAREN column_def_list:partition_col_defs RPAREN comment_val:comment
-  row_format_val:row_format serde_properties:serde_props
-  file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
-  tbl_properties:tbl_props
-  {:
-    RESULT = new CreateTableStmt(table, col_defs, partition_col_defs, external, comment,
-        row_format, file_format, location, cache_op, if_not_exists, tbl_props,
-        serde_props, null);
-  :}
-  | KW_CREATE external_val:external KW_TABLE if_not_exists_val:if_not_exists
-    table_name:table KW_PARTITIONED KW_BY
-    LPAREN column_def_list:partition_col_defs RPAREN
-    comment_val:comment row_format_val:row_format serde_properties:serde_props
-    file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
-    tbl_properties:tbl_props
-  {:
-    RESULT = new CreateTableStmt(table, new ArrayList<ColumnDef>(), partition_col_defs,
-        external, comment, row_format, file_format, location, cache_op, if_not_exists,
-        tbl_props, serde_props, null);
-  :}
-  ;
-
 create_udf_stmt ::=
   KW_CREATE KW_FUNCTION if_not_exists_val:if_not_exists
   function_name:fn_name function_def_args:fn_args
@@ -1252,7 +1270,7 @@ row_format_val ::=
   escaped_by_val:escaped_by line_terminator_val:line_terminator
   {: RESULT = new RowFormat(field_terminator, line_terminator, escaped_by); :}
   |/* empty */
-  {: RESULT = RowFormat.DEFAULT_ROW_FORMAT; :}
+  {: RESULT = null; :}
   ;
 
 escaped_by_val ::=
@@ -1284,12 +1302,14 @@ terminator_val ::=
 file_format_create_table_val ::=
   KW_STORED KW_AS file_format_val:file_format
   {: RESULT = file_format; :}
-  | /* empty - default to TEXT */
-  {: RESULT = THdfsFileFormat.TEXT; :}
+  |
+  {: RESULT = null; :}
   ;
 
 file_format_val ::=
-  KW_PARQUET
+  KW_KUDU
+  {: RESULT = THdfsFileFormat.KUDU; :}
+  | KW_PARQUET
   {: RESULT = THdfsFileFormat.PARQUET; :}
   | KW_PARQUETFILE
   {: RESULT = THdfsFileFormat.PARQUET; :}
@@ -1307,14 +1327,14 @@ tbl_properties ::=
   KW_TBLPROPERTIES LPAREN properties_map:map RPAREN
   {: RESULT = map; :}
   | /* empty */
-  {: RESULT = null; :}
+  {: RESULT = new HashMap<String, String>(); :}
   ;
 
 serde_properties ::=
   KW_WITH KW_SERDEPROPERTIES LPAREN properties_map:map RPAREN
   {: RESULT = map; :}
   | /* empty */
-  {: RESULT = null; :}
+  {: RESULT = new HashMap<String, String>(); :}
   ;
 
 properties_map ::=
@@ -1331,17 +1351,10 @@ properties_map ::=
   :}
   ;
 
-partition_column_defs ::=
-  KW_PARTITIONED KW_BY LPAREN column_def_list:col_defs RPAREN
-  {: RESULT = col_defs; :}
-  | /* Empty - not a partitioned table */
-  {: RESULT = new ArrayList<ColumnDef>(); :}
-  ;
-
 column_def_list ::=
   column_def:col_def
   {:
-    ArrayList<ColumnDef> list = new ArrayList<ColumnDef>();
+    ArrayList<ColumnDef> list = Lists.newArrayList();
     list.add(col_def);
     RESULT = list;
   :}
@@ -1353,8 +1366,15 @@ column_def_list ::=
   ;
 
 column_def ::=
-  IDENT:col_name type_def:type comment_val:comment
-  {: RESULT = new ColumnDef(col_name, type, comment); :}
+  IDENT:col_name type_def:type is_primary_key_val:primary_key comment_val:comment
+  {: RESULT = new ColumnDef(col_name, type, primary_key, comment); :}
+  ;
+
+is_primary_key_val ::=
+  KW_PRIMARY key_ident
+  {: RESULT = true; :}
+  | /* empty */
+  {: RESULT = false; :}
   ;
 
 create_view_stmt ::=
@@ -1377,6 +1397,15 @@ create_data_src_stmt ::=
   :}
   ;
 
+key_ident ::=
+  IDENT:ident
+  {:
+    if (!ident.toUpperCase().equals("KEY")) {
+      parser.parseError("identifier", SqlParserSymbols.IDENT, "KEY");
+    }
+  :}
+  ;
+
 source_ident ::=
   IDENT:ident
   {:
@@ -2996,6 +3025,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_JOIN:r
   {: RESULT = r.toString(); :}
+  | KW_KUDU:r
+  {: RESULT = r.toString(); :}
   | KW_LAST:r
   {: RESULT = r.toString(); :}
   | KW_LEFT:r
@@ -3050,6 +3081,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_PREPARE_FN:r
   {: RESULT = r.toString(); :}
+  | KW_PRIMARY:r
+  {: RESULT = r.toString(); :}
   | KW_PRODUCED:r
   {: RESULT = r.toString(); :}
   | KW_PURGE:r

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/AnalysisUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisUtils.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisUtils.java
new file mode 100644
index 0000000..2a3294e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisUtils.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.common.AnalysisException;
+
+import java.util.Collection;
+
+class AnalysisUtils {
+
+  static <T> void throwIfNotNull(T o, String message) throws AnalysisException {
+    if (o != null) throw new AnalysisException(message);
+  }
+
+  static void throwIfNotEmpty(Collection<?> c, String message)
+      throws AnalysisException {
+    if (c != null && !c.isEmpty()) throw new AnalysisException(message);
+  }
+
+  static <T> void throwIfNull(T o, String message) throws AnalysisException {
+    if (o == null) throw new AnalysisException(message);
+  }
+
+  static void throwIfNullOrEmpty(Collection<?> c, String message)
+      throws AnalysisException {
+    if (c == null || c.isEmpty()) throw new AnalysisException(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
index 6b2a1d2..1b634f7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
@@ -17,8 +17,15 @@
 
 package org.apache.impala.analysis;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
@@ -26,9 +33,6 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.util.MetaStoreUtil;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * Represents a column definition in a CREATE/ALTER TABLE/VIEW statement.
@@ -47,9 +51,19 @@ public class ColumnDef {
   private final TypeDef typeDef_;
   private Type type_;
 
+  // Set to true if the user specified "PRIMARY KEY" in the column definition. Kudu table
+  // definitions may use this.
+  private boolean isPrimaryKey_;
+
   public ColumnDef(String colName, TypeDef typeDef, String comment) {
+    this(colName, typeDef, false, comment);
+  }
+
+  public ColumnDef(String colName, TypeDef typeDef, boolean isPrimaryKey,
+      String comment) {
     colName_ = colName.toLowerCase();
     typeDef_ = typeDef;
+    isPrimaryKey_ = isPrimaryKey;
     comment_ = comment;
   }
 
@@ -67,13 +81,15 @@ public class ColumnDef {
     colName_ = fs.getName();
     typeDef_ = new TypeDef(type);
     comment_ = fs.getComment();
+    isPrimaryKey_ = false;
     analyze();
   }
 
+  public String getColName() { return colName_; }
   public void setType(Type type) { type_ = type; }
   public Type getType() { return type_; }
   public TypeDef getTypeDef() { return typeDef_; }
-  public String getColName() { return colName_; }
+  boolean isPrimaryKey() { return isPrimaryKey_; }
   public void setComment(String comment) { comment_ = comment; }
   public String getComment() { return comment_; }
 
@@ -107,16 +123,32 @@ public class ColumnDef {
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder(colName_);
+    StringBuilder sb = new StringBuilder(colName_).append(" ");
     if (type_ != null) {
-      sb.append(" " + type_.toString());
+      sb.append(type_);
     } else {
-      sb.append(" " + typeDef_.toString());
+      sb.append(typeDef_);
     }
+    if (isPrimaryKey_) sb.append(" PRIMARY KEY");
     if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_));
     return sb.toString();
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (obj == this) return true;
+    if (obj.getClass() != getClass()) return false;
+    ColumnDef rhs = (ColumnDef) obj;
+    return new EqualsBuilder()
+        .append(colName_, rhs.colName_)
+        .append(comment_, rhs.comment_)
+        .append(isPrimaryKey_, rhs.isPrimaryKey_)
+        .append(typeDef_, rhs.typeDef_)
+        .append(type_, rhs.type_)
+        .isEquals();
+  }
+
   public TColumn toThrift() {
     TColumn col = new TColumn(new TColumn(getColName(), type_.toThrift()));
     col.setComment(getComment());
@@ -140,4 +172,24 @@ public class ColumnDef {
     });
   }
 
+  static List<String> toColumnNames(Collection<ColumnDef> colDefs) {
+    List<String> colNames = Lists.newArrayList();
+    for (ColumnDef colDef: colDefs) {
+      colNames.add(colDef.getColName());
+    }
+    return colNames;
+  }
+
+  /**
+   * Generates and returns a map of column names to column definitions. Assumes that
+   * the column names are unique.
+   */
+  static Map<String, ColumnDef> mapByColumnNames(Collection<ColumnDef> colDefs) {
+    Map<String, ColumnDef> colDefsByColName = Maps.newHashMap();
+    for (ColumnDef colDef: colDefs) {
+      ColumnDef def = colDefsByColName.put(colDef.getColName(), colDef);
+      Preconditions.checkState(def == null);
+    }
+    return colDefsByColName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index b2a95c4..816af80 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -27,7 +27,6 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableId;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.THdfsFileFormat;
@@ -62,7 +61,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
   /////////////////////////////////////////
 
   private final static EnumSet<THdfsFileFormat> SUPPORTED_INSERT_FORMATS =
-      EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT);
+      EnumSet.of(THdfsFileFormat.PARQUET, THdfsFileFormat.TEXT, THdfsFileFormat.KUDU);
 
   /**
    * Builds a CREATE TABLE AS SELECT statement
@@ -95,6 +94,18 @@ public class CreateTableAsSelectStmt extends StatementBase {
     if (isAnalyzed()) return;
     super.analyze(analyzer);
 
+    if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) {
+      throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
+          "does not support the (%s) file format. Supported formats are: (%s)",
+          createStmt_.getFileFormat().toString().replace("_", ""),
+          "PARQUET, TEXTFILE, KUDU"));
+    }
+    if (createStmt_.getFileFormat() == THdfsFileFormat.KUDU && createStmt_.isExternal()) {
+      // TODO: Add support for CTAS on external Kudu tables (see IMPALA-4318)
+      throw new AnalysisException(String.format("CREATE TABLE AS SELECT is not " +
+          "supported for external Kudu tables."));
+    }
+
     // The analysis for CTAS happens in two phases - the first phase happens before
     // the target table exists and we want to validate the CREATE statement and the
     // query portion of the insert statement. If this passes, analysis will be run
@@ -154,12 +165,6 @@ public class CreateTableAsSelectStmt extends StatementBase {
     }
     createStmt_.analyze(analyzer);
 
-    if (!SUPPORTED_INSERT_FORMATS.contains(createStmt_.getFileFormat())) {
-      throw new AnalysisException(String.format("CREATE TABLE AS SELECT " +
-          "does not support (%s) file format. Supported formats are: (%s)",
-          createStmt_.getFileFormat().toString().replace("_", ""),
-          "PARQUET, TEXTFILE"));
-    }
 
     // The full privilege check for the database will be done as part of the INSERT
     // analysis.
@@ -188,14 +193,20 @@ public class CreateTableAsSelectStmt extends StatementBase {
       // SelectStmt (or the BE will be very confused). To ensure the ID is unique within
       // this query, just assign it the invalid table ID. The CatalogServer will assign
       // this table a proper ID once it is created there as part of the CTAS execution.
-      Table table = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl);
-      Preconditions.checkState(table != null &&
-          (table instanceof HdfsTable || table instanceof KuduTable));
+      Table tmpTable = null;
+      if (KuduTable.isKuduTable(msTbl)) {
+        tmpTable = KuduTable.createCtasTarget(db, msTbl, createStmt_.getColumnDefs(),
+            createStmt_.getTblPrimaryKeyColumnNames(), createStmt_.getDistributeParams());
+      } else {
+        // TODO: Creating a tmp table using load() is confusing.
+        // Refactor it to use a 'createCtasTarget()' function similar to Kudu table.
+        tmpTable = Table.fromMetastoreTable(TableId.createInvalidId(), db, msTbl);
+        tmpTable.load(true, client.getHiveClient(), msTbl);
+      }
+      Preconditions.checkState(tmpTable != null &&
+          (tmpTable instanceof HdfsTable || tmpTable instanceof KuduTable));
 
-      table.load(true, client.getHiveClient(), msTbl);
-      insertStmt_.setTargetTable(table);
-    } catch (TableLoadingException e) {
-      throw new AnalysisException(e.getMessage(), e);
+      insertStmt_.setTargetTable(tmpTable);
     } catch (Exception e) {
       throw new AnalysisException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
index 3c54dfd..1df8280 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableDataSrcStmt.java
@@ -23,19 +23,12 @@ import static org.apache.impala.catalog.DataSourceTable.TBL_PROP_DATA_SRC_NAME;
 import static org.apache.impala.catalog.DataSourceTable.TBL_PROP_INIT_STRING;
 import static org.apache.impala.catalog.DataSourceTable.TBL_PROP_LOCATION;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.DataSourceTable;
-import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.thrift.THdfsFileFormat;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.permission.FsAction;
 
 /**
@@ -46,25 +39,12 @@ import org.apache.hadoop.fs.permission.FsAction;
  */
 public class CreateTableDataSrcStmt extends CreateTableStmt {
 
-  public CreateTableDataSrcStmt(TableName tableName, List<ColumnDef> columnDefs,
-      String dataSourceName, String initString, String comment, boolean ifNotExists) {
-    super(tableName, columnDefs, Lists.<ColumnDef>newArrayList(), false, comment,
-        RowFormat.DEFAULT_ROW_FORMAT, THdfsFileFormat.TEXT, null, null, ifNotExists,
-        createInitialTableProperties(dataSourceName, initString),
-        Maps.<String, String>newHashMap(), null);
-  }
-
-  /**
-   * Creates the initial map of table properties containing the name of the data
-   * source and the table init string.
-   */
-  private static Map<String, String> createInitialTableProperties(
-      String dataSourceName, String initString) {
+  public CreateTableDataSrcStmt(CreateTableStmt createTableStmt, String dataSourceName,
+      String initString) {
+    super(createTableStmt);
     Preconditions.checkNotNull(dataSourceName);
-    Map<String, String> tableProperties = Maps.newHashMap();
-    tableProperties.put(TBL_PROP_DATA_SRC_NAME, dataSourceName.toLowerCase());
-    tableProperties.put(TBL_PROP_INIT_STRING, Strings.nullToEmpty(initString));
-    return tableProperties;
+    getTblProperties().put(TBL_PROP_DATA_SRC_NAME, dataSourceName.toLowerCase());
+    getTblProperties().put(TBL_PROP_INIT_STRING, Strings.nullToEmpty(initString));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index a9a8a90..a653323 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -21,12 +21,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
-
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.OriginalType;
@@ -36,8 +36,8 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MapType;
-import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
@@ -45,8 +45,6 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.THdfsFileFormat;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 
 /**
@@ -60,16 +58,9 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
   private final static String ERROR_MSG =
       "Failed to convert Parquet type\n%s\nto an Impala %s type:\n%s\n";
 
-  public CreateTableLikeFileStmt(TableName tableName, THdfsFileFormat schemaFileFormat,
-      HdfsUri schemaLocation, List<ColumnDef> partitionColumnDescs,
-      boolean isExternal, String comment, RowFormat rowFormat,
-      THdfsFileFormat fileFormat, HdfsUri location, HdfsCachingOp cachingOp,
-      boolean ifNotExists, Map<String, String> tblProperties,
-      Map<String, String> serdeProperties) {
-    super(tableName, new ArrayList<ColumnDef>(), partitionColumnDescs,
-        isExternal, comment, rowFormat,
-        fileFormat, location, cachingOp, ifNotExists, tblProperties, serdeProperties,
-        null);
+  public CreateTableLikeFileStmt(CreateTableStmt createTableStmt,
+      THdfsFileFormat schemaFileFormat, HdfsUri schemaLocation) {
+    super(createTableStmt);
     schemaLocation_ = schemaLocation;
     schemaFileFormat_ = schemaFileFormat;
   }
@@ -351,8 +342,8 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
         schemaLocation_.toString());
     String s = ToSqlUtils.getCreateTableSql(getDb(),
         getTbl() + " __LIKE_FILEFORMAT__ ",  getComment(), colsSql, partitionColsSql,
-        getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
-        getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()),
+        null, null, getTblProperties(), getSerdeProperties(), isExternal(),
+        getIfNotExists(), getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()),
         compression, null, getLocation());
     s = s.replace("__LIKE_FILEFORMAT__", "LIKE " + schemaFileFormat_ + " " +
         schemaLocation_.toString());
@@ -361,6 +352,10 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (getFileFormat() == THdfsFileFormat.KUDU) {
+      throw new AnalysisException("CREATE TABLE LIKE FILE statement is not supported " +
+          "for Kudu tables.");
+    }
     schemaLocation_.analyze(analyzer, Privilege.ALL, FsAction.READ_WRITE);
     switch (schemaFileFormat_) {
       case PARQUET:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
index 72843e8..6fde627 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -17,16 +17,18 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.permission.FsAction;
 
 import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCreateTableLikeParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TTableName;
-import com.google.common.base.Preconditions;
 
 /**
  * Represents a CREATE TABLE LIKE statement which creates a new table based on
@@ -134,10 +136,19 @@ public class CreateTableLikeStmt extends StatementBase {
   public void analyze(Analyzer analyzer) throws AnalysisException {
     Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
     Preconditions.checkState(srcTableName_ != null && !srcTableName_.isEmpty());
+    // We currently don't support creating a Kudu table using a CREATE TABLE LIKE
+    // statement (see IMPALA-4052).
+    if (fileFormat_ == THdfsFileFormat.KUDU) {
+      throw new AnalysisException("CREATE TABLE LIKE is not supported for Kudu tables");
+    }
+
     // Make sure the source table exists and the user has permission to access it.
-    srcDbName_ = analyzer
-        .getTable(srcTableName_, Privilege.VIEW_METADATA)
-        .getDb().getName();
+    Table srcTable = analyzer.getTable(srcTableName_, Privilege.VIEW_METADATA);
+    if (KuduTable.isKuduTable(srcTable.getMetaStoreTable())) {
+      throw new AnalysisException("Cloning a Kudu table using CREATE TABLE LIKE is " +
+          "not supported.");
+    }
+    srcDbName_ = srcTable.getDb().getName();
     tableName_.analyze();
     dbName_ = analyzer.getTargetDbName(tableName_);
     owner_ = analyzer.getUser().getName();