You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/02/04 17:47:56 UTC

[impala] 03/03: IMPALA-11809: Support non unique primary key for Kudu

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 40da36414ff4d46b5cdc53f068b1f0a5b28a0f1d
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sun Nov 6 09:09:00 2022 -0800

    IMPALA-11809: Support non unique primary key for Kudu
    
    Kudu engine recently enables the auto-incrementing column feature
    (KUDU-1945). The feature works by appending a system generated
    auto-incrementing column to the primary key columns to guarantee the
    uniqueness on primary key when the primary key columns can be non
    unique. The non unique primary key columns and the auto-incrementing
    column form the effective unique composite primary key.
    
    This auto-incrementing column is named as 'auto_incrementing_id' with
    big int type. The assignment to it during insertion is automatic so
    insertion statements should not specify values for auto-incrementing
    column. In current Kudu implementation, there is no central key provider
    for auto-incrementing columns. It uses a per tablet-server global
    counter to assign values for auto-incrementing columns. So the values
    of auto-incrementing columns are not unique in a Kudu table, but unique
    within a continuous region of the table served by a tablet-server.
    
    This patch also upgraded Kudu version to 345fd44ca3 to pick up Kudu
    changes needed for supporting non-unique primary key. It added
    syntactic support for creating Kudu table with non unique primary key.
    When creating a Kudu table, specifying PRIMARY KEY is optional.
    If there is no primary key attribute specified, the partition key
    columns will be promoted as non unique primary key if those columns
    are the beginning columns of the table.
    New column "key_unique" is added to the output of 'describe' table
    command for Kudu table.
    
    Examples of CREATE TABLE statement with non unique primary key:
      CREATE TABLE tbl (i INT NON UNIQUE PRIMARY KEY, s STRING)
      PARTITION BY HASH (i) PARTITIONS 3
      STORED as KUDU;
    
      CREATE TABLE tbl (i INT, s STRING, NON UNIQUE PRIMARY KEY(i))
      PARTITION BY HASH (i) PARTITIONS 3
      STORED as KUDU;
    
      CREATE TABLE tbl NON UNIQUE PRIMARY KEY(id)
      PARTITION BY HASH (id) PARTITIONS 3
      STORED as KUDU
      AS SELECT id, string_col FROM functional.alltypes WHERE id = 10;
    
      CREATE TABLE tbl NON UNIQUE PRIMARY KEY(id)
      PARTITION BY RANGE (id)
      (PARTITION VALUES <= 1000,
       PARTITION 1000 < VALUES <= 2000,
       PARTITION 2000 < VALUES <= 3000,
       PARTITION 3000 < VALUES)
      STORED as KUDU
      AS SELECT id, int_col FROM functional.alltypestiny ORDER BY id ASC
       LIMIT 4000;
    
      CREATE TABLE tbl (id INT, name STRING, NON UNIQUE PRIMARY KEY(id))
      STORED as KUDU;
    
      CREATE TABLE tbl (a INT, b STRING, c FLOAT)
      PARTITION BY HASH (a, b) PARTITIONS 3
      STORED as KUDU;
    
    SELECT statement does not show the system generated auto-incrementing
    column unless the column is explicitly specified in the select list.
    Auto-incrementing column cannot be added, removed or renamed with
    ALTER TABLE statements.
    UPSERT operation is not supported now for Kudu tables with auto
    incrementing column due to limitation in Kudu engine.
    
    Testing:
     - Ran manual test in impala-shell with queries to create Kudu tables
       with non unique primary key, and tested insert/update/delete
       operations for these tables with non unique primary key.
     - Added front end tests, and end to end unit tests for Kudu tables
       with non unique primary key.
     - Passed exhaustive test.
    
    Change-Id: I4d7882bf3d01a3492cc9827c072d1f3200d9eebd
    Reviewed-on: http://gerrit.cloudera.org:8080/19383
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/impala-config.sh                               |   6 +-
 common/thrift/CatalogObjects.thrift                |  10 ++
 common/thrift/JniCatalog.thrift                    |   3 +
 fe/src/main/cup/sql-parser.cup                     |  42 +++--
 .../impala/analysis/AlterTableAddColsStmt.java     |   7 +-
 .../impala/analysis/AlterTableAlterColStmt.java    |   6 +-
 .../java/org/apache/impala/analysis/ColumnDef.java |  26 ++-
 .../impala/analysis/CreateTableAsSelectStmt.java   |   2 +-
 .../impala/analysis/CreateTableLikeFileStmt.java   |   3 +-
 .../apache/impala/analysis/CreateTableStmt.java    |   2 +
 .../org/apache/impala/analysis/InsertStmt.java     |  18 +-
 .../org/apache/impala/analysis/ModifyStmt.java     |   6 +-
 .../org/apache/impala/analysis/SelectStmt.java     |   3 +
 .../java/org/apache/impala/analysis/TableDef.java  |  92 +++++++++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  28 ++--
 fe/src/main/java/org/apache/impala/catalog/Db.java |  10 +-
 .../main/java/org/apache/impala/catalog/FeDb.java  |   6 +-
 .../org/apache/impala/catalog/FeKuduTable.java     |  10 ++
 .../java/org/apache/impala/catalog/KuduColumn.java |  54 ++++--
 .../java/org/apache/impala/catalog/KuduTable.java  |  40 ++++-
 .../org/apache/impala/catalog/local/LocalDb.java   |  10 +-
 .../impala/catalog/local/LocalKuduTable.java       |  53 ++++--
 .../impala/service/DescribeResultFactory.java      |  10 +-
 .../java/org/apache/impala/service/Frontend.java   |   1 +
 .../impala/service/KuduCatalogOpExecutor.java      |  18 +-
 .../main/java/org/apache/impala/util/KuduUtil.java |  15 +-
 fe/src/main/jflex/sql-scanner.flex                 |   4 +-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  31 +++-
 .../apache/impala/analysis/AnalyzeKuduDDLTest.java |  93 ++++++++++-
 .../org/apache/impala/analysis/ParserTest.java     |  13 +-
 .../queries/QueryTest/kudu-scan-node.test          |  81 +++++++++
 .../queries/QueryTest/kudu_alter.test              |  79 +++++++--
 .../queries/QueryTest/kudu_create.test             | 161 ++++++++++++++++++
 .../queries/QueryTest/kudu_delete.test             |  64 ++++++++
 .../queries/QueryTest/kudu_describe.test           | 182 +++++++++++++++------
 .../queries/QueryTest/kudu_hms_alter.test          |  18 +-
 .../queries/QueryTest/kudu_insert.test             | 146 +++++++++++++++++
 .../queries/QueryTest/kudu_partition_ddl.test      |  20 +--
 .../queries/QueryTest/kudu_stats.test              |  61 ++++++-
 .../queries/QueryTest/kudu_update.test             |  95 +++++++++++
 .../queries/QueryTest/kudu_upsert.test             |  15 ++
 tests/custom_cluster/test_kudu.py                  |   2 +-
 tests/metadata/test_ddl_base.py                    |   2 +-
 tests/query_test/test_kudu.py                      |  16 +-
 44 files changed, 1355 insertions(+), 209 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index a02a8b694..c954b15d2 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -81,7 +81,7 @@ export USE_APACHE_HIVE=${USE_APACHE_HIVE-false}
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=223-7cf7e75bc8
+export IMPALA_TOOLCHAIN_BUILD_ID=237-c284a9372e
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -876,9 +876,7 @@ fi
 # overall build type) and does not apply when using a local Kudu build.
 export USE_KUDU_DEBUG_BUILD=${USE_KUDU_DEBUG_BUILD-false}
 
-# IMPALA-11441: This githash is a custom Kudu that is equivalent to the upstream
-# githash dc4031f693 plus a revert of KUDU-1644, which avoids IMPALA-11441.
-export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"956093dd9d"}
+export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"345fd44ca3"}
 export IMPALA_KUDU_HOME=${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-$IMPALA_KUDU_VERSION
 export IMPALA_KUDU_JAVA_HOME=\
 ${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-${IMPALA_KUDU_VERSION}/java
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 60bf88164..12d87ab49 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -296,6 +296,8 @@ struct TColumn {
   18: optional i32 block_size
   // The column name, in the case that it appears in Kudu.
   19: optional string kudu_column_name
+  24: optional bool is_primary_key_unique
+  25: optional bool is_auto_incrementing
 
   // Here come the Iceberg-specific fields.
   20: optional bool is_iceberg_column
@@ -573,6 +575,14 @@ struct TKuduTable {
 
   // Partitioning
   4: required list<TKuduPartitionParam> partition_by
+
+  // Set to true if primary key of the Kudu table is unique.
+  // Kudu engine automatically adds an auto-incrementing column in the table if
+  // primary key is not unique, in this case, this field is set to false.
+  5: optional bool is_primary_key_unique
+
+  // Set to true if the table has auto-incrementing column
+  6: optional bool has_auto_incrementing
 }
 
 struct TIcebergPartitionTransform {
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 79eb2989a..3efe0d0e5 100755
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -594,6 +594,9 @@ struct TCreateTableParams {
 
   // Bucket desc for created bucketed table
   22: optional CatalogObjects.TBucketInfo bucket_info
+
+  // Primary key is unique (Kudu-only)
+  23: optional bool is_primary_key_unique
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index d0ea00956..dcf8eba5c 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -316,7 +316,7 @@ terminal
   KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS,
   KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
   KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, KW_MANAGED_LOCATION, KW_MAP, KW_MERGE_FN,
-  KW_METADATA, KW_MINUS, KW_NORELY, KW_NOT,
+  KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, KW_NOT,
   KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OR,
   KW_ORC, KW_ORDER, KW_OUTER,
   KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED,
@@ -329,9 +329,9 @@ terminal
   KW_STRING, KW_STRUCT, KW_SYMBOL, KW_SYSTEM_TIME, KW_SYSTEM_VERSION,
   KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
   KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS,
-  KW_TO, KW_TRUE, KW_UDF, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UNNEST, KW_UNSET,
-  KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALIDATE, KW_VALUES,
-  KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
+  KW_TO, KW_TRUE, KW_UDF, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNIQUE, KW_UNKNOWN,
+  KW_UNNEST, KW_UNSET, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING, KW_VALIDATE,
+  KW_VALUES, KW_VARCHAR, KW_VIEW, KW_WHEN, KW_WHERE, KW_WITH, KW_ZORDER;
 
 terminal UNUSED_RESERVED_WORD;
 
@@ -402,7 +402,8 @@ nonterminal SelectListItem star_expr;
 nonterminal Expr expr, non_pred_expr, arithmetic_expr, timestamp_arithmetic_expr;
 nonterminal List<Expr> expr_list;
 nonterminal String alias_clause;
-nonterminal List<String> ident_list, primary_keys;
+nonterminal List<String> ident_list;
+nonterminal Pair<List<String>, Boolean> primary_keys;
 nonterminal List<String> opt_ident_list;
 nonterminal Pair<TBucketInfo, Pair<List<String>, TSortingOrder>> opt_clustered;
 nonterminal Pair<List<String>, TSortingOrder> opt_sort_cols;
@@ -536,7 +537,7 @@ nonterminal THdfsFileFormat storage_engine_val;
 nonterminal THdfsFileFormat file_format_create_table_val;
 nonterminal Boolean if_exists_val;
 nonterminal Boolean if_not_exists_val;
-nonterminal Boolean is_primary_key_val;
+nonterminal Pair<Boolean, Boolean> is_primary_key_val;
 nonterminal HdfsUri location_val;
 nonterminal HdfsUri managed_location_val;
 nonterminal RowFormat row_format_val, opt_row_format_val;
@@ -1417,7 +1418,8 @@ create_tbl_as_select_params ::=
     tbl_options:options
     KW_AS query_stmt:select_stmt
   {:
-    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
+    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys.first);
+    tbl_def.setPrimaryKeyUnique(primary_keys.second);
     tbl_def.getKuduPartitionParams().addAll(partition_params.getKuduPartitionParams());
     tbl_def.getIcebergPartitionSpecs().addAll(partition_params.getIcebergPartitionSpecs());
     tbl_def.setOptions(options);
@@ -1551,9 +1553,10 @@ tbl_def_with_col_defs ::=
     primary_keys:primary_keys RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
-    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
+    tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys.first);
+    tbl_def.setPrimaryKeyUnique(primary_keys.second);
     TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
-        primary_keys, null, true, false, false);
+        primary_keys.first, null, true, false, false);
     tbl_def.setPrimaryKey(pk);
     RESULT = tbl_def;
   :}
@@ -1562,8 +1565,9 @@ tbl_def_with_col_defs ::=
       rely_spec:rely_spec RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
+    tbl_def.setPrimaryKeyUnique(primary_keys.second);
     TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
-        primary_keys, null, rely_spec, validate_spec, enable_spec);
+        primary_keys.first, null, rely_spec, validate_spec, enable_spec);
     tbl_def.setPrimaryKey(pk);
     RESULT = tbl_def;
   :}
@@ -1573,8 +1577,9 @@ tbl_def_with_col_defs ::=
     rely_spec:rely_spec COMMA foreign_keys_list:foreign_keys_list RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
+    tbl_def.setPrimaryKeyUnique(primary_keys.second);
     TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
-      primary_keys, null, rely_spec, validate_spec, enable_spec);
+      primary_keys.first, null, rely_spec, validate_spec, enable_spec);
     tbl_def.setPrimaryKey(pk);
     tbl_def.getForeignKeysList().addAll(foreign_keys_list);
     RESULT = tbl_def;
@@ -1585,8 +1590,9 @@ tbl_def_with_col_defs ::=
     enable_spec:enable_spec validate_spec:validate_spec rely_spec:rely_spec RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
+    tbl_def.setPrimaryKeyUnique(primary_keys.second);
     TableDef.PrimaryKey pk = new TableDef.PrimaryKey(tbl_def.getTblName(),
-        primary_keys, null, rely_spec, validate_spec, enable_spec);
+        primary_keys.first, null, rely_spec, validate_spec, enable_spec);
     tbl_def.setPrimaryKey(pk);
     tbl_def.getForeignKeysList().addAll(foreign_keys_list);
     RESULT = tbl_def;
@@ -1623,7 +1629,9 @@ foreign_keys_list ::=
 
 primary_keys ::=
   KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN
-  {: RESULT = col_names; :}
+  {: RESULT = new Pair<List<String>, Boolean>(col_names, true); :}
+  | KW_NON KW_UNIQUE KW_PRIMARY key_ident LPAREN ident_list:col_names RPAREN
+  {: RESULT = new Pair<List<String>, Boolean>(col_names, false); :}
   ;
 
 rely_spec ::=
@@ -2227,7 +2235,9 @@ column_option ::=
 
 is_primary_key_val ::=
   KW_PRIMARY key_ident
-  {: RESULT = true; :}
+  {: RESULT = new Pair<Boolean, Boolean>(true, true); :}
+  | KW_NON KW_UNIQUE KW_PRIMARY key_ident
+  {: RESULT = new Pair<Boolean, Boolean>(true, false); :}
   ;
 
 nullability_val ::=
@@ -4346,6 +4356,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_MINUS:r
   {: RESULT = r.toString(); :}
+  | KW_NON:r
+  {: RESULT = r.toString(); :}
   | KW_NORELY:r
   {: RESULT = r.toString(); :}
   | KW_NOT:r
@@ -4514,6 +4526,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_UNION:r
   {: RESULT = r.toString(); :}
+  | KW_UNIQUE:r
+  {: RESULT = r.toString(); :}
   | KW_UNKNOWN:r
   {: RESULT = r.toString(); :}
   | KW_UNNEST:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddColsStmt.java
index f8495378f..e373e9fa3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddColsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddColsStmt.java
@@ -24,10 +24,12 @@ import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableAddColsParams;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableType;
+import org.apache.impala.util.KuduUtil;
 
 import java.util.HashSet;
 import java.util.List;
@@ -93,8 +95,9 @@ public class AlterTableAddColsStmt extends AlterTableStmt {
               c.toString());
         }
         if (c.isPrimaryKey()) {
-          throw new AnalysisException("Cannot add a primary key using an ALTER TABLE " +
-              "ADD COLUMNS statement: " + c.toString());
+          throw new AnalysisException("Cannot add a " +
+              KuduUtil.getPrimaryKeyString(c.isPrimaryKeyUnique()) +
+              " using an ALTER TABLE ADD COLUMNS statement: " + c.toString());
         }
         if (c.isExplicitNotNullable() && !c.hasDefaultValue()) {
           throw new AnalysisException("A new non-null column must have a default " +
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
index 6454e3764..6293c36f7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAlterColStmt.java
@@ -156,6 +156,7 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
     }
     if (t instanceof FeKuduTable) {
       KuduColumn col = (KuduColumn) t.getColumn(colName_);
+      boolean isSystemGeneratedColumn = col.isAutoIncrementing();
       if (!col.getType().equals(newColDef_.getType())) {
         throw new AnalysisException(String.format("Cannot change the type of a Kudu " +
             "column using an ALTER TABLE CHANGE COLUMN statement: (%s vs %s)",
@@ -163,8 +164,9 @@ public class AlterTableAlterColStmt extends AlterTableStmt {
       }
       if (col.isKey() && newColDef_.hasDefaultValue()) {
         throw new AnalysisException(String.format(
-            "Cannot %s default value for primary key column '%s'",
-            isDropDefault_ ? "drop" : "set", colName_));
+            "Cannot %s default value for %sprimary key column '%s'",
+            isDropDefault_ ? "drop" : "set",
+            isSystemGeneratedColumn ? "system generated " : "", colName_));
       }
       if (newColDef_.isPrimaryKey()) {
         throw new AnalysisException(
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
index 75b1319a3..f491e2607 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumn;
@@ -74,8 +75,11 @@ public class ColumnDef {
 
   // Kudu-specific column options
   //
-  // Set to true if the user specified "PRIMARY KEY" in the column definition.
+  // Set to true if the user specified "PRIMARY KEY" or "NON UNIQUE PRIMARY KEY" in the
+  // column definition.
   private boolean isPrimaryKey_;
+  // Set to false if the user specified "NON UNIQUE PRIMARY KEY" in the column definition.
+  private boolean isPrimaryKeyUnique_;
   // Set to true if this column may contain null values. Can be NULL if
   // not specified.
   private Boolean isNullable_;
@@ -106,8 +110,9 @@ public class ColumnDef {
     for (Map.Entry<Option, Object> option: options.entrySet()) {
       switch (option.getKey()) {
         case IS_PRIMARY_KEY:
-          Preconditions.checkState(option.getValue() instanceof Boolean);
-          isPrimaryKey_ = (Boolean) option.getValue();
+          Preconditions.checkState(option.getValue() instanceof Pair);
+          isPrimaryKey_ = ((Pair<Boolean, Boolean>)option.getValue()).first;
+          isPrimaryKeyUnique_ = ((Pair<Boolean, Boolean>)option.getValue()).second;
           break;
         case IS_NULLABLE:
           Preconditions.checkState(option.getValue() instanceof Boolean);
@@ -166,6 +171,7 @@ public class ColumnDef {
   public Type getType() { return type_; }
   public TypeDef getTypeDef() { return typeDef_; }
   boolean isPrimaryKey() { return isPrimaryKey_; }
+  boolean isPrimaryKeyUnique() { return isPrimaryKeyUnique_; }
   public void setComment(String comment) { comment_ = comment; }
   public String getComment() { return comment_; }
   public boolean hasKuduOptions() {
@@ -222,8 +228,8 @@ public class ColumnDef {
 
   private void analyzeKuduOptions(Analyzer analyzer) throws AnalysisException {
     if (isPrimaryKey_ && isNullable_ != null && isNullable_) {
-      throw new AnalysisException("Primary key columns cannot be nullable: " +
-          toString());
+      throw new AnalysisException(KuduUtil.getPrimaryKeyString(isPrimaryKeyUnique_) +
+          " columns cannot be nullable: " + toString());
     }
     // Encoding value
     if (encodingVal_ != null) {
@@ -332,7 +338,9 @@ public class ColumnDef {
     } else {
       sb.append(typeDef_.toSql());
     }
-    if (isPrimaryKey_) sb.append(" PRIMARY KEY");
+    if (isPrimaryKey_) {
+      sb.append(" ").append(KuduUtil.getPrimaryKeyString(isPrimaryKeyUnique_));
+    }
     if (isNullable_ != null) sb.append(isNullable_ ? " NULL" : " NOT NULL");
     if (encoding_ != null) sb.append(" ENCODING " + encoding_.toString());
     if (compression_ != null) sb.append(" COMPRESSION " + compression_.toString());
@@ -352,6 +360,7 @@ public class ColumnDef {
         .append(colName_, rhs.colName_)
         .append(comment_, rhs.comment_)
         .append(isPrimaryKey_, rhs.isPrimaryKey_)
+        .append(isPrimaryKeyUnique_, rhs.isPrimaryKeyUnique_)
         .append(typeDef_, rhs.typeDef_)
         .append(type_, rhs.type_)
         .append(isNullable_, rhs.isNullable_)
@@ -366,8 +375,9 @@ public class ColumnDef {
     TColumn col = new TColumn(getColName(), type_.toThrift());
     Integer blockSize =
         blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue();
-    KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_,
-        compression_, outputDefaultValue_, blockSize, colName_);
+    KuduUtil.setColumnOptions(col, isPrimaryKey_, isPrimaryKeyUnique_, isNullable_,
+        /* isAutoIncrementing */false, encoding_, compression_, outputDefaultValue_,
+        blockSize, colName_);
     if (comment_ != null) col.setComment(comment_);
     return col;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index b111d560e..4df4a01de 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -224,7 +224,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
       FeTable tmpTable = null;
       if (KuduTable.isKuduTable(msTbl)) {
         tmpTable = db.createKuduCtasTarget(msTbl, createStmt_.getColumnDefs(),
-            createStmt_.getPrimaryKeyColumnDefs(),
+            createStmt_.getPrimaryKeyColumnDefs(), createStmt_.isPrimaryKeyUnique(),
             createStmt_.getKuduPartitionParams());
       } else if (IcebergTable.isIcebergTable(msTbl)) {
         IcebergPartitionSpec partSpec = null;
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index d0efe18ed..48571a4cb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -54,7 +54,8 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
         schemaLocation_.toString());
     String s = ToSqlUtils.getCreateTableSql(getDb(),
         getTbl() + " __LIKE_FILEFORMAT__ ",  getComment(), colsSql, partitionColsSql,
-        null, null, null, new Pair<>(getSortColumns(), getSortingOrder()),
+        /* isPrimaryKeyUnique */true, /* primaryKeysSql */null, /* foreignKeysSql */null,
+        /* kuduPartitionByParams */null, new Pair<>(getSortColumns(), getSortingOrder()),
         getTblProperties(), getSerdeProperties(), isExternal(), getIfNotExists(),
         getRowFormat(), HdfsFileFormat.fromThrift(getFileFormat()), compression, null,
         getLocation(), null, null);
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index ad72039bd..47c1f3b58 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -116,6 +116,7 @@ public class CreateTableStmt extends StatementBase {
   public List<ColumnDef> getPrimaryKeyColumnDefs() {
     return tableDef_.getPrimaryKeyColumnDefs();
   }
+  public boolean isPrimaryKeyUnique() { return tableDef_.isPrimaryKeyUnique(); }
   public List<SQLPrimaryKey> getPrimaryKeys() { return tableDef_.getSqlPrimaryKeys(); }
   public List<SQLForeignKey> getForeignKeys() { return tableDef_.getSqlForeignKeys(); }
   public boolean isExternal() { return tableDef_.isExternal(); }
@@ -220,6 +221,7 @@ public class CreateTableStmt extends StatementBase {
     params.setTable_properties(Maps.newHashMap(getTblProperties()));
     params.getTable_properties().putAll(Maps.newHashMap(getGeneratedKuduProperties()));
     params.setSerde_properties(getSerdeProperties());
+    params.setIs_primary_key_unique(isPrimaryKeyUnique());
     for (KuduPartitionParam d: getKuduPartitionParams()) {
       params.addToPartition_by(d.toThrift());
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 97ab4202d..d7712508a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -358,7 +358,11 @@ public class InsertStmt extends StatementBase {
       analysisColumnPermutation = new ArrayList<>();
       List<Column> tableColumns = table_.getColumns();
       for (int i = numClusteringCols; i < tableColumns.size(); ++i) {
-        analysisColumnPermutation.add(tableColumns.get(i).getName());
+        Column c = tableColumns.get(i);
+        // Omit auto-incrementing column for Kudu table since the values of the column
+        // will be assigned by Kudu engine.
+        if (c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing()) continue;
+        analysisColumnPermutation.add(c.getName());
       }
     }
 
@@ -492,6 +496,9 @@ public class InsertStmt extends StatementBase {
     if (isUpsert_) {
       if (!(table_ instanceof FeKuduTable)) {
         throw new AnalysisException("UPSERT is only supported for Kudu tables");
+      } else if (((FeKuduTable)table_).hasAutoIncrementingColumn()) {
+        throw new AnalysisException(
+            "UPSERT is not supported for Kudu tables with auto-incrementing column");
       }
     } else {
       analyzeTableForInsert(analyzer);
@@ -769,8 +776,12 @@ public class InsertStmt extends StatementBase {
     List<String> keyColumns = ((FeKuduTable) table_).getPrimaryKeyColumnNames();
     List<String> missingKeyColumnNames = new ArrayList<>();
     for (Column column : table_.getColumns()) {
+      Preconditions.checkState(column instanceof KuduColumn);
+      // Omit auto-incrementing column for Kudu table since the values of the column
+      // will be assigned by Kudu engine.
       if (!mentionedColumnNames.contains(column.getName())
-          && keyColumns.contains(column.getName())) {
+          && keyColumns.contains(column.getName())
+          && !((KuduColumn)column).isAutoIncrementing()) {
         missingKeyColumnNames.add(column.getName());
       }
     }
@@ -969,7 +980,8 @@ public class InsertStmt extends StatementBase {
           if (isKuduTable) {
             Preconditions.checkState(tblColumn instanceof KuduColumn);
             KuduColumn kuduCol = (KuduColumn) tblColumn;
-            if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) {
+            if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()
+                && !kuduCol.isAutoIncrementing()) {
               throw new AnalysisException("Missing values for column that is not " +
                   "nullable and has no default value " + kuduCol.getName());
             }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index e30018fd8..91079f310 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -31,6 +31,7 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
@@ -285,7 +286,10 @@ public abstract class ModifyStmt extends StatementBase {
       }
 
       if (keySlots.contains(lhsSlotRef.getSlotId())) {
-        throw new AnalysisException(format("Key column '%s' cannot be updated.",
+        boolean isSystemGeneratedColumn =
+            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
+        throw new AnalysisException(format("%s column '%s' cannot be updated.",
+            isSystemGeneratedColumn ? "System generated key" : "Key",
             lhsSlotRef.toSql()));
       }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index b6a5ee2ab..e420462be 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -40,6 +40,7 @@ import org.apache.impala.catalog.FeIcebergTable.Utils;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
@@ -828,6 +829,8 @@ public class SelectStmt extends QueryStmt {
         TupleDescriptor tupleDesc = resolvedPath.destTupleDesc();
         FeTable table = tupleDesc.getTable();
         for (Column c: table.getColumnsInHiveOrder()) {
+          // Omit auto-incrementing column for Kudu table since it's a hidden column.
+          if (c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing()) continue;
           addStarExpandedPath(selectListItem, resolvedPath, c.getName());
         }
       } else {
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 05da2420a..7f83565d1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -25,6 +25,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -49,6 +50,7 @@ import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.thrift.TException;
 
@@ -83,6 +85,11 @@ class TableDef {
   // mean no primary keys were specified as the columnDefs_ could contain primary keys.
   private final List<String> primaryKeyColNames_ = new ArrayList<>();
 
+  // If true, the primary key is unique. If not, an auto-incrementing column will be
+  // added automatically by Kudu engine. This extra key column helps produce a unique
+  // composite primary key (primary keys + auto-incrementing construct).
+  private boolean isPrimaryKeyUnique_;
+
   // If true, the table's data will be preserved if dropped.
   private final boolean isExternal_;
 
@@ -356,9 +363,14 @@ class TableDef {
     return columnDefs_.stream().map(col -> col.getType()).collect(Collectors.toList());
   }
 
-  public void setPrimaryKey(TableDef.PrimaryKey primaryKey_) {
-    this.primaryKey_ = primaryKey_;
+  public void setPrimaryKey(TableDef.PrimaryKey primaryKey) {
+    this.primaryKey_ = primaryKey;
+  }
+
+  public void setPrimaryKeyUnique(boolean isKeyUnique) {
+    this.isPrimaryKeyUnique_ = isKeyUnique;
   }
+
   List<String> getPartitionColumnNames() {
     return ColumnDef.toColumnNames(getPartitionColumnDefs());
   }
@@ -371,6 +383,7 @@ class TableDef {
   boolean isIcebergTable() { return options_.fileFormat == THdfsFileFormat.ICEBERG; }
   List<String> getPrimaryKeyColumnNames() { return primaryKeyColNames_; }
   List<ColumnDef> getPrimaryKeyColumnDefs() { return primaryKeyColDefs_; }
+  boolean isPrimaryKeyUnique() { return isPrimaryKeyUnique_; }
   boolean isExternal() { return isExternal_; }
   boolean getIfNotExists() { return ifNotExists_; }
   Map<String, String> getGeneratedProperties() { return generatedProperties_; }
@@ -417,7 +430,7 @@ class TableDef {
     fqTableName_.analyze();
     analyzeAcidProperties(analyzer);
     analyzeColumnDefs(analyzer);
-    analyzePrimaryKeys();
+    analyzePrimaryKeys(analyzer);
     analyzeForeignKeys(analyzer);
 
     if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
@@ -493,28 +506,75 @@ class TableDef {
    * in the table column definitions and if composite primary keys are properly defined
    * using the PRIMARY KEY (col,..col) clause.
    */
-  private void analyzePrimaryKeys() throws AnalysisException {
+  private void analyzePrimaryKeys(Analyzer analyzer) throws AnalysisException {
     for (ColumnDef colDef: columnDefs_) {
-      if (colDef.isPrimaryKey()) primaryKeyColDefs_.add(colDef);
+      if (colDef.isPrimaryKey()) {
+        primaryKeyColDefs_.add(colDef);
+        if (!colDef.isPrimaryKeyUnique() && !isKuduTable()) {
+          throw new AnalysisException(
+              "Non unique primary key is only supported for Kudu.");
+        }
+      }
     }
     if (primaryKeyColDefs_.size() > 1) {
-      throw new AnalysisException("Multiple primary keys specified. " +
-          "Composite primary keys can be specified using the " +
-          "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+      String primaryKeyString =
+          KuduUtil.getPrimaryKeyString(primaryKeyColDefs_.get(0).isPrimaryKeyUnique());
+      throw new AnalysisException(String.format(
+          "Multiple %sS specified. Composite %s can be specified using the %s " +
+          "(col1, col2, ...) syntax at the end of the column definition.",
+          primaryKeyString, primaryKeyString, primaryKeyString));
     }
 
     if (primaryKeyColNames_.isEmpty()) {
       if (primaryKey_ == null || primaryKey_.getPrimaryKeyColNames().isEmpty()) {
-        return;
+        if (!isKuduTable()) return;
+
+        if (!primaryKeyColDefs_.isEmpty()) {
+          setPrimaryKeyUnique(primaryKeyColDefs_.get(0).isPrimaryKeyUnique());
+          return;
+        } else if (!getKuduPartitionParams().isEmpty()) {
+          // Promote all partition columns as non unique primary key columns if primary
+          // keys are not declared by the user for the Kudu table. Since key columns
+          // must be as the first columns in the table, only all partition columns which
+          // are the beginning columns of the table can be promoted as non unique primary
+          // key columns.
+          List<String> colNames = getColumnNames();
+          TreeMap<Integer, String> partitionCols = new TreeMap<Integer, String>();
+          for (KuduPartitionParam partitionParam: getKuduPartitionParams()) {
+            for (String colName: partitionParam.getColumnNames()) {
+              int index = colNames.indexOf(colName);
+              Preconditions.checkState(index >= 0);
+              partitionCols.put(index, colName);
+            }
+          }
+          if (partitionCols.size() > 0
+              && partitionCols.lastKey() == partitionCols.size() - 1) {
+            primaryKeyColNames_.addAll(partitionCols.values());
+            setPrimaryKeyUnique(false);
+            analyzer.addWarning(String.format(
+                "Partition columns (%s) are promoted as non unique primary key.",
+                String.join(", ", partitionCols.values())));
+          } else {
+            throw new AnalysisException(
+                "Specify primary key or non unique primary key for the Kudu table, " +
+                "or create partitions with the beginning columns of the table.");
+          }
+        }
+        if (primaryKeyColNames_.isEmpty()) return;
       } else {
         primaryKeyColNames_.addAll(primaryKey_.getPrimaryKeyColNames());
       }
     }
 
+    String primaryKeyString = KuduUtil.getPrimaryKeyString(isPrimaryKeyUnique_);
     if (!primaryKeyColDefs_.isEmpty()) {
-      throw new AnalysisException("Multiple primary keys specified. " +
-          "Composite primary keys can be specified using the " +
-          "PRIMARY KEY (col1, col2, ...) syntax at the end of the column definition.");
+      throw new AnalysisException(String.format(
+          "Multiple %sS specified. Composite %s can be specified using the %s " +
+          "(col1, col2, ...) syntax at the end of the column definition.",
+          primaryKeyString, primaryKeyString, primaryKeyString));
+    } else if (!primaryKeyColNames_.isEmpty() && !isPrimaryKeyUnique()
+        && !isKuduTable()) {
+      throw new AnalysisException(primaryKeyString + " is only supported for Kudu.");
     }
     Map<String, ColumnDef> colDefsByColName = ColumnDef.mapByColumnNames(columnDefs_);
     int keySeq = 1;
@@ -525,13 +585,13 @@ class TableDef {
       if (colDef == null) {
         if (ColumnDef.toColumnNames(primaryKeyColDefs_).contains(colName)) {
           throw new AnalysisException(String.format("Column '%s' is listed multiple " +
-              "times as a PRIMARY KEY.", colName));
+              "times as a %s.", colName, primaryKeyString));
         }
-        throw new AnalysisException(String.format(
-            "PRIMARY KEY column '%s' does not exist in the table", colName));
+        throw new AnalysisException(String.format("%s column '%s' does not exist in " +
+            "the table", primaryKeyString, colName));
       }
       if (colDef.isExplicitNullable()) {
-        throw new AnalysisException("Primary key columns cannot be nullable: " +
+        throw new AnalysisException(primaryKeyString + " columns cannot be nullable: " +
             colDef.toString());
       }
       // HDFS Table specific analysis.
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 8136eb06c..0fbb7ef1f 100755
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -290,7 +290,8 @@ public class ToSqlUtils {
     String icebergPartitionSpecs = getIcebergPartitionSpecsSql(stmt);
     // TODO: Pass the correct compression, if applicable.
     return getCreateTableSql(stmt.getDb(), stmt.getTbl(), stmt.getComment(), colsSql,
-        partitionColsSql, stmt.getTblPrimaryKeyColumnNames(), stmt.getForeignKeysSql(),
+        partitionColsSql, stmt.isPrimaryKeyUnique(),
+        stmt.getTblPrimaryKeyColumnNames(), stmt.getForeignKeysSql(),
         kuduParamsSql, new Pair<>(stmt.getSortColumns(), stmt.getSortingOrder()),
         properties, stmt.getSerdeProperties(), stmt.isExternal(), stmt.getIfNotExists(),
         stmt.getRowFormat(), HdfsFileFormat.fromThrift(stmt.getFileFormat()),
@@ -326,7 +327,7 @@ public class ToSqlUtils {
     String icebergPartitionSpecs = getIcebergPartitionSpecsSql(innerStmt);
     // TODO: Pass the correct compression, if applicable.
     String createTableSql = getCreateTableSql(innerStmt.getDb(), innerStmt.getTbl(),
-        innerStmt.getComment(), null, partitionColsSql,
+        innerStmt.getComment(), null, partitionColsSql, innerStmt.isPrimaryKeyUnique(),
         innerStmt.getTblPrimaryKeyColumnNames(), innerStmt.getForeignKeysSql(),
         kuduParamsSql, new Pair<>(innerStmt.getSortColumns(),
         innerStmt.getSortingOrder()), properties, innerStmt.getSerdeProperties(),
@@ -379,6 +380,7 @@ public class ToSqlUtils {
     TBucketInfo bucketInfo = BucketUtils.fromStorageDescriptor(msTable.getSd());
 
     String storageHandlerClassName = table.getStorageHandlerClassName();
+    boolean isPrimaryKeyUnique = true;
     List<String> primaryKeySql = new ArrayList<>();
     List<String> foreignKeySql = new ArrayList<>();
     String kuduPartitionByParams = null;
@@ -403,6 +405,7 @@ public class ToSqlUtils {
       // Internal property, should not be exposed to the user.
       properties.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
 
+      isPrimaryKeyUnique = kuduTable.isPrimaryKeyUnique();
       if (KuduTable.isSynchronizedTable(msTable)) {
         primaryKeySql.addAll(kuduTable.getPrimaryKeyColumnNames());
 
@@ -448,8 +451,9 @@ public class ToSqlUtils {
     }
 
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
-    return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
-        partitionColsSql, primaryKeySql, foreignKeySql, kuduPartitionByParams,
+    return getCreateTableSql(
+        table.getDb().getName(), table.getName(), comment, colsSql, partitionColsSql,
+        isPrimaryKeyUnique, primaryKeySql, foreignKeySql, kuduPartitionByParams,
         new Pair<>(sortColsSql, sortingOrder), properties, serdeParameters,
         isExternal, false, rowFormat, format, compression,
         storageHandlerClassName, tableLocation, icebergPartitions, bucketInfo);
@@ -462,11 +466,11 @@ public class ToSqlUtils {
    */
   public static String getCreateTableSql(String dbName, String tableName,
       String tableComment, List<String> columnsSql, List<String> partitionColumnsSql,
-      List<String> primaryKeysSql, List<String> foreignKeysSql,
-      String kuduPartitionByParams, Pair<List<String>, TSortingOrder> sortProperties,
-      Map<String, String> tblProperties, Map<String, String> serdeParameters,
-      boolean isExternal, boolean ifNotExists, RowFormat rowFormat,
-      HdfsFileFormat fileFormat, HdfsCompression compression,
+      boolean isPrimaryKeyUnique, List<String> primaryKeysSql,
+      List<String> foreignKeysSql, String kuduPartitionByParams, Pair<List<String>,
+      TSortingOrder> sortProperties, Map<String, String> tblProperties,
+      Map<String, String> serdeParameters, boolean isExternal, boolean ifNotExists,
+      RowFormat rowFormat, HdfsFileFormat fileFormat, HdfsCompression compression,
       String storageHandlerClass, HdfsUri location, String icebergPartitions,
       TBucketInfo bucketInfo) {
     Preconditions.checkNotNull(tableName);
@@ -480,7 +484,8 @@ public class ToSqlUtils {
       sb.append(" (\n  ");
       sb.append(Joiner.on(",\n  ").join(columnsSql));
       if (CollectionUtils.isNotEmpty(primaryKeysSql)) {
-        sb.append(",\n  PRIMARY KEY (");
+        sb.append(",\n  ");
+        sb.append(KuduUtil.getPrimaryKeyString(isPrimaryKeyUnique)).append(" (");
         Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")");
       }
       if (CollectionUtils.isNotEmpty(foreignKeysSql)) {
@@ -491,7 +496,8 @@ public class ToSqlUtils {
     } else {
       // CTAS for Kudu tables still print the primary key
       if (primaryKeysSql != null && !primaryKeysSql.isEmpty()) {
-        sb.append("\n PRIMARY KEY (");
+        sb.append("\n ");
+        sb.append(KuduUtil.getPrimaryKeyString(isPrimaryKeyUnique)).append(" (");
         Joiner.on(", ").appendTo(sb, primaryKeysSql).append(")");
       }
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 8a79cf3e7..0cb800b0c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -238,11 +238,11 @@ public class Db extends CatalogObjectImpl implements FeDb {
 
   @Override
   public FeKuduTable createKuduCtasTarget(
-      org.apache.hadoop.hive.metastore.api.Table msTbl,
-      List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
-      List<KuduPartitionParam> kuduPartitionParams) {
-    return KuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
-        kuduPartitionParams);
+      org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
+      List<ColumnDef> primaryKeyColumnDefs, boolean isPrimaryKeyUnique,
+      List<KuduPartitionParam> kuduPartitionParams) throws ImpalaRuntimeException {
+    return KuduTable.createCtasTarget(this, msTbl, columnDefs, isPrimaryKeyUnique,
+        primaryKeyColumnDefs, kuduPartitionParams);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeDb.java b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
index f927a4687..f8cc4715d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeDb.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.util.PatternMatcher;
@@ -121,8 +123,8 @@ public interface FeDb extends HasName {
    * Create a target Kudu table object for CTAS.
    */
   FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
-      List<ColumnDef> primaryKeyColumnDefs,
-      List<KuduPartitionParam> kuduPartitionParams);
+      List<ColumnDef> primaryKeyColumnDefs, boolean isPrimaryKeyUnique,
+      List<KuduPartitionParam> kuduPartitionParams) throws ImpalaRuntimeException;
 
   /**
    * Create a target FS table object for CTAS.
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
index 60688e3e5..ec0336bbe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeKuduTable.java
@@ -57,6 +57,16 @@ public interface FeKuduTable extends FeTable {
    */
   String getKuduTableName();
 
+  /**
+   * Return true if the primary key is unique.
+   */
+  boolean isPrimaryKeyUnique();
+
+  /**
+   * Return true if the table has auto-incrementing column.
+   */
+  boolean hasAutoIncrementingColumn();
+
   /**
    * Return the names of the columns that make up the primary key
    * of this table.
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index 03dcd9606..a39cf613e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -27,23 +27,29 @@ import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
 import org.apache.kudu.ColumnSchema.Encoding;
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
 
 /**
  *  Represents a Kudu column.
  *
  *  This class extends Column with Kudu-specific information:
  *  - primary key
+ *  - primary key unique
  *  - nullability constraint
+ *  - auto_incrementing
  *  - encoding
  *  - compression
  *  - default value
  *  - desired block size
  */
 public class KuduColumn extends Column {
+
   // The name of the column as it appears in Kudu, i.e. not converted to lower case.
   private final String kuduName_;
   private final boolean isKey_;
+  private final boolean isPrimaryKeyUnique_;
   private final boolean isNullable_;
+  private final boolean isAutoIncrementing_;
   private final Encoding encoding_;
   private final CompressionAlgorithm compression_;
   private final int blockSize_;
@@ -55,15 +61,23 @@ public class KuduColumn extends Column {
   // to hide this complexity externally.
   private final LiteralExpr defaultValue_;
 
-  private KuduColumn(String name, Type type, boolean isKey, boolean isNullable,
-      Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue,
-      int blockSize, String comment, int position) {
+  private KuduColumn(String name, Type type, boolean isKey, boolean isPrimaryKeyUnique,
+      boolean isNullable, boolean isAutoIncrementing, Encoding encoding,
+      CompressionAlgorithm compression, LiteralExpr defaultValue, int blockSize,
+      String comment, int position) {
     super(name.toLowerCase(), type, comment, position);
     Preconditions.checkArgument(defaultValue == null || type == defaultValue.getType()
         || (type.isTimestamp() && defaultValue.getType().isIntegerType()));
+    if (isKey) {
+      Preconditions.checkArgument(!isPrimaryKeyUnique || !isAutoIncrementing);
+    } else {
+      Preconditions.checkArgument(!isPrimaryKeyUnique && !isAutoIncrementing);
+    }
     kuduName_ = name;
     isKey_ = isKey;
+    isPrimaryKeyUnique_ = isPrimaryKeyUnique;
     isNullable_ = isNullable;
+    isAutoIncrementing_ = isAutoIncrementing;
     encoding_ = encoding;
     compression_ = compression;
     defaultValue_ = defaultValue;
@@ -88,15 +102,17 @@ public class KuduColumn extends Column {
     }
     String comment = !colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
     return new KuduColumn(colSchema.getName(), type, colSchema.isKey(),
-        colSchema.isNullable(), colSchema.getEncoding(),
-        colSchema.getCompressionAlgorithm(), defaultValueExpr,
+        colSchema.isKeyUnique(), colSchema.isNullable(), colSchema.isAutoIncrementing(),
+        colSchema.getEncoding(), colSchema.getCompressionAlgorithm(), defaultValueExpr,
         colSchema.getDesiredBlockSize(), comment, position);
   }
 
   public static KuduColumn fromThrift(TColumn column, int position)
       throws ImpalaRuntimeException {
     Preconditions.checkState(column.isSetIs_key());
-    Preconditions.checkState(column.isSetIs_nullable());
+    Preconditions.checkState(column.isSetIs_primary_key_unique());
+    boolean isNullable = false;
+    if (column.isSetIs_nullable()) isNullable = column.isIs_nullable();
     Type columnType = Type.fromThrift(column.getColumnType());
     Encoding encoding = null;
     if (column.isSetEncoding()) encoding = KuduUtil.fromThrift(column.getEncoding());
@@ -116,13 +132,30 @@ public class KuduColumn extends Column {
     String comment = (column.isSetComment() && !column.getComment().isEmpty()) ?
         column.getComment() : null;
     return new KuduColumn(column.getKudu_column_name(), columnType, column.isIs_key(),
-        column.isIs_nullable(), encoding, compression, defaultValue, blockSize, comment,
-        position);
+        column.isIs_primary_key_unique(), isNullable, column.isIs_auto_incrementing(),
+        encoding, compression, defaultValue, blockSize, comment, position);
+  }
+
+  // Create KuduColumn for auto-incrementing column in given 'position'.
+  // This function is called when creating temporary KuduTable object for CTAS when
+  // the primary key is not unique.
+  public static KuduColumn createAutoIncrementingColumn(int position)
+      throws ImpalaRuntimeException {
+    org.apache.kudu.Type kuduType = Schema.getAutoIncrementingColumnType();
+    Preconditions.checkArgument(kuduType != org.apache.kudu.Type.DECIMAL &&
+        kuduType != org.apache.kudu.Type.VARCHAR);
+    Type type = KuduUtil.toImpalaType(kuduType, null);
+    return new KuduColumn(Schema.getAutoIncrementingColumnName(), type,
+        /* isKey */true, /* isPrimaryKeyUnique */false, /* isNullable */false,
+        /* isAutoIncrementing */true, /* encoding */null, /* compression */null,
+        /* defaultValue */null, /* blockSize */0, /* comment */"", position);
   }
 
   public String getKuduName() { return kuduName_; }
   public boolean isKey() { return isKey_; }
+  public boolean isPrimaryKeyUnique() { return isPrimaryKeyUnique_; }
   public boolean isNullable() { return isNullable_; }
+  public boolean isAutoIncrementing() { return isAutoIncrementing_; }
   public Encoding getEncoding() { return encoding_; }
   public CompressionAlgorithm getCompression() { return compression_; }
   public int getBlockSize() { return blockSize_; }
@@ -154,8 +187,9 @@ public class KuduColumn extends Column {
   @Override
   public TColumn toThrift() {
     TColumn colDesc = new TColumn(name_, type_.toThrift());
-    KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, compression_,
-        defaultValue_, blockSize_, kuduName_);
+    KuduUtil.setColumnOptions(
+        colDesc, isKey_, isPrimaryKeyUnique_, isNullable_, isAutoIncrementing_,
+        encoding_, compression_, defaultValue_, blockSize_, kuduName_);
     if (comment_ != null) colDesc.setComment(comment_);
     colDesc.setCol_stats(getStats().toThrift());
     colDesc.setPosition(position_);
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 2ce418aa0..8f1600a2c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -33,6 +33,7 @@ import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.KuduPartitionParam;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TKuduPartitionByHashParam;
 import org.apache.impala.thrift.TKuduPartitionByRangeParam;
 import org.apache.impala.thrift.TKuduPartitionParam;
@@ -42,6 +43,7 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.HiveMetastoreConfig;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
@@ -103,6 +105,12 @@ public class KuduTable extends Table implements FeKuduTable {
   // Comma separated list of Kudu master hosts with optional ports.
   private String kuduMasters_;
 
+  // Set to true if primary key is unique.
+  private boolean isPrimaryKeyUnique_ = true;
+
+  // Set to true if the table has auto-incrementing column.
+  private boolean hasAutoIncrementingColumn_ = false;
+
   // Primary key column names, the column names are all in lower case.
   private final List<String> primaryKeyColumnNames_ = new ArrayList<>();
 
@@ -182,6 +190,12 @@ public class KuduTable extends Table implements FeKuduTable {
 
   public org.apache.kudu.Schema getKuduSchema() { return kuduSchema_; }
 
+  @Override
+  public boolean isPrimaryKeyUnique() { return isPrimaryKeyUnique_; }
+
+  @Override
+  public boolean hasAutoIncrementingColumn() { return hasAutoIncrementingColumn_; }
+
   @Override
   public List<String> getPrimaryKeyColumnNames() {
     return ImmutableList.copyOf(primaryKeyColumnNames_);
@@ -374,6 +388,9 @@ public class KuduTable extends Table implements FeKuduTable {
 
     int pos = 0;
     kuduSchema_ = kuduTable.getSchema();
+    isPrimaryKeyUnique_ = kuduSchema_.isPrimaryKeyUnique();
+    hasAutoIncrementingColumn_ = kuduSchema_.hasAutoIncrementingColumn();
+    Preconditions.checkState(!isPrimaryKeyUnique_ || !hasAutoIncrementingColumn_);
     for (ColumnSchema colSchema: kuduSchema_.getColumns()) {
       KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos);
       Preconditions.checkNotNull(kuduCol);
@@ -397,15 +414,30 @@ public class KuduTable extends Table implements FeKuduTable {
    */
   public static KuduTable createCtasTarget(Db db,
       org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
-      List<ColumnDef> primaryKeyColumnDefs, List<KuduPartitionParam> partitionParams) {
+      boolean isPrimaryKeyUnique, List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> partitionParams)
+      throws ImpalaRuntimeException {
     KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
+    tmpTable.isPrimaryKeyUnique_ = isPrimaryKeyUnique;
     int pos = 0;
     for (ColumnDef colDef: columnDefs) {
-      tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
+      tmpTable.addColumn(KuduColumn.fromThrift(colDef.toThrift(), pos++));
+      // Simulate Kudu engine to add auto-incrementing column as the key column in the
+      // temporary KuduTable if the primary key is not unique so that the temporary
+      // KuduTable has same layout as the table created by Kudu engine. This makes
+      // analysis module could find the right position for each column.
+      if (!isPrimaryKeyUnique && pos == primaryKeyColumnDefs.size()) {
+        tmpTable.addColumn(KuduColumn.createAutoIncrementingColumn(pos++));
+        tmpTable.hasAutoIncrementingColumn_ = true;
+      }
     }
     for (ColumnDef pkColDef: primaryKeyColumnDefs) {
       tmpTable.primaryKeyColumnNames_.add(pkColDef.getColName());
     }
+    if (!isPrimaryKeyUnique) {
+      // Add auto-incrementing column's name to the list of key column's name
+      tmpTable.primaryKeyColumnNames_.add(Schema.getAutoIncrementingColumnName());
+    }
     tmpTable.partitionBy_ = ImmutableList.copyOf(partitionParams);
     return tmpTable;
   }
@@ -426,6 +458,8 @@ public class KuduTable extends Table implements FeKuduTable {
     kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
     primaryKeyColumnNames_.clear();
     primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
+    isPrimaryKeyUnique_ = tkudu.isIs_primary_key_unique();
+    hasAutoIncrementingColumn_ = tkudu.isHas_auto_incrementing();
     partitionBy_ = loadPartitionByParamsFromThrift(tkudu.getPartition_by());
   }
 
@@ -459,6 +493,8 @@ public class KuduTable extends Table implements FeKuduTable {
   private TKuduTable getTKuduTable() {
     TKuduTable tbl = new TKuduTable();
     tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
+    tbl.setIs_primary_key_unique(isPrimaryKeyUnique_);
+    tbl.setHas_auto_incrementing(hasAutoIncrementingColumn_);
     tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
     tbl.setTable_name(kuduTableName_);
     Preconditions.checkNotNull(partitionBy_);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 610b9c235..82482be3c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -35,6 +35,8 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TBriefTableMeta;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TFunctionCategory;
@@ -140,10 +142,10 @@ public class LocalDb implements FeDb {
 
   @Override
   public FeKuduTable createKuduCtasTarget(Table msTbl, List<ColumnDef> columnDefs,
-      List<ColumnDef> primaryKeyColumnDefs,
-      List<KuduPartitionParam> kuduPartitionParams) {
-    return LocalKuduTable.createCtasTarget(this, msTbl, columnDefs, primaryKeyColumnDefs,
-        kuduPartitionParams);
+      List<ColumnDef> primaryKeyColumnDefs, boolean isPrimaryKeyUnique,
+      List<KuduPartitionParam> kuduPartitionParams) throws ImpalaRuntimeException {
+    return LocalKuduTable.createCtasTarget(this, msTbl, columnDefs, isPrimaryKeyUnique,
+        primaryKeyColumnDefs, kuduPartitionParams);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index 36eb7167e..b2bd98a00 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -53,8 +53,10 @@ import com.google.errorprone.annotations.Immutable;
 
 public class LocalKuduTable extends LocalTable implements FeKuduTable {
   private final TableParams tableParams_;
-  private final List<KuduPartitionParam> partitionBy_;
+  private final boolean isPrimaryKeyUnique_;
+  private final boolean hasAutoIncrementingColumn_;
   private final ImmutableList<String> primaryKeyColumnNames_;
+  private final List<KuduPartitionParam> partitionBy_;
 
   private final org.apache.kudu.client.KuduTable kuduTable_;
 
@@ -81,7 +83,9 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     // Use the schema derived from Kudu, rather than the one stored in the HMS.
     msTable.getSd().setCols(fieldSchemas);
 
-
+    boolean isPrimaryKeyUnique = kuduTable.getSchema().isPrimaryKeyUnique();
+    boolean hasAutoIncrementingColumn =
+        kuduTable.getSchema().hasAutoIncrementingColumn();
     List<String> pkNames = new ArrayList<>();
     for (ColumnSchema c: kuduTable.getSchema().getPrimaryKeyColumns()) {
       pkNames.add(c.getName().toLowerCase());
@@ -91,32 +95,43 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
 
     ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName,
         /*isFullAcidSchema=*/false);
-    return new LocalKuduTable(db, msTable, ref, cmap, kuduTable, pkNames, partitionBy);
+    return new LocalKuduTable(db, msTable, ref, cmap, kuduTable, isPrimaryKeyUnique,
+        pkNames, hasAutoIncrementingColumn, partitionBy);
   }
 
-
   public static FeKuduTable createCtasTarget(LocalDb db, Table msTable,
-      List<ColumnDef> columnDefs, List<ColumnDef> primaryKeyColumnDefs,
-      List<KuduPartitionParam> kuduPartitionParams) {
+      List<ColumnDef> columnDefs, boolean isPrimaryKeyUnique,
+      List<ColumnDef> primaryKeyColumnDefs,
+      List<KuduPartitionParam> kuduPartitionParams) throws ImpalaRuntimeException {
     String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
 
+    boolean hasAutoIncrementingColumn = false;
     List<Column> columns = new ArrayList<>();
     List<String> pkNames = new ArrayList<>();
     int pos = 0;
     for (ColumnDef colDef: columnDefs) {
-      // TODO(todd): it seems odd that for CTAS targets, the columns are of type
-      // 'Column' instead of 'KuduColumn'.
-      columns.add(new Column(colDef.getColName(), colDef.getType(), pos++));
+      columns.add(KuduColumn.fromThrift(colDef.toThrift(), pos++));
+      // Simulate Kudu engine to add auto-incrementing column as the key column in the
+      // temporary KuduTable if the primary key is not unique so that analysis module
+      // could find the right position for each column.
+      if (!isPrimaryKeyUnique && pos == primaryKeyColumnDefs.size()) {
+        columns.add(KuduColumn.createAutoIncrementingColumn(pos++));
+        hasAutoIncrementingColumn = true;
+      }
     }
     for (ColumnDef pkColDef: primaryKeyColumnDefs) {
       pkNames.add(pkColDef.getColName());
     }
+    if (!isPrimaryKeyUnique) {
+      // Add auto-incrementing column's name to the list of key column's name
+      pkNames.add(Schema.getAutoIncrementingColumnName());
+    }
 
     ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName,
         /*isFullAcidSchema=*/false);
 
     return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, /*kuduTable*/null,
-        pkNames, kuduPartitionParams);
+        isPrimaryKeyUnique, pkNames, hasAutoIncrementingColumn, kuduPartitionParams);
   }
 
   private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -141,14 +156,16 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
   }
 
   private LocalKuduTable(LocalDb db, Table msTable, TableMetaRef ref, ColumnMap cmap,
-      org.apache.kudu.client.KuduTable kuduTable,
-      List<String> primaryKeyColumnNames,
+      org.apache.kudu.client.KuduTable kuduTable, boolean isPrimaryKeyUnique,
+      List<String> primaryKeyColumnNames, boolean hasAutoIncrementingColumn,
       List<KuduPartitionParam> partitionBy)  {
     super(db, msTable, ref, cmap);
     kuduTable_ = kuduTable;
     tableParams_ = new TableParams(msTable);
     partitionBy_ = ImmutableList.copyOf(partitionBy);
+    isPrimaryKeyUnique_ = isPrimaryKeyUnique;
     primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);
+    hasAutoIncrementingColumn_ = hasAutoIncrementingColumn;
   }
 
   @Override
@@ -168,6 +185,16 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     return kuduTable_;
   }
 
+  @Override
+  public boolean isPrimaryKeyUnique() {
+    return isPrimaryKeyUnique_;
+  }
+
+  @Override
+  public boolean hasAutoIncrementingColumn() {
+    return hasAutoIncrementingColumn_;
+  }
+
   @Override
   public List<String> getPrimaryKeyColumnNames() {
     return primaryKeyColumnNames_;
@@ -187,6 +214,8 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
         getNumClusteringCols(),
         name_, db_.getName());
     TKuduTable tbl = new TKuduTable();
+    tbl.setIs_primary_key_unique(isPrimaryKeyUnique_);
+    tbl.setHas_auto_incrementing(hasAutoIncrementingColumn_);
     tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
     tbl.setMaster_addresses(tableParams_.getMastersAsList());
     tbl.setTable_name(tableParams_.kuduTableName_);
diff --git a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
index 4b536d3d4..9f42d5547 100644
--- a/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
+++ b/fe/src/main/java/org/apache/impala/service/DescribeResultFactory.java
@@ -300,6 +300,12 @@ public class DescribeResultFactory {
       // Kudu-specific describe info.
       TColumnValue pkCol = new TColumnValue();
       pkCol.setString_val(Boolean.toString(kuduColumn.isKey()));
+      TColumnValue pkUniqueCol = new TColumnValue();
+      if (kuduColumn.isKey()) {
+        pkUniqueCol.setString_val(Boolean.toString(kuduColumn.isPrimaryKeyUnique()));
+      } else {
+        pkUniqueCol.setString_val("");
+      }
       TColumnValue nullableCol = new TColumnValue();
       nullableCol.setString_val(Boolean.toString(kuduColumn.isNullable()));
       TColumnValue defaultValCol = new TColumnValue();
@@ -315,8 +321,8 @@ public class DescribeResultFactory {
       TColumnValue blockSizeCol = new TColumnValue();
       blockSizeCol.setString_val(Integer.toString(kuduColumn.getBlockSize()));
       descResult.results.add(new TResultRow(
-          Lists.newArrayList(colNameCol, dataTypeCol, commentCol, pkCol, nullableCol,
-              defaultValCol, encodingCol, compressionCol, blockSizeCol)));
+          Lists.newArrayList(colNameCol, dataTypeCol, commentCol, pkCol, pkUniqueCol,
+              nullableCol, defaultValCol, encodingCol, compressionCol, blockSizeCol)));
     }
     return descResult;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index a261f87ef..40469880e 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -605,6 +605,7 @@ public class Frontend {
       if (descStmt.getTable() instanceof FeKuduTable
           && descStmt.getOutputStyle() == TDescribeOutputStyle.MINIMAL) {
         columns.add(new TColumn("primary_key", Type.STRING.toThrift()));
+        columns.add(new TColumn("key_unique", Type.STRING.toThrift()));
         columns.add(new TColumn("nullable", Type.STRING.toThrift()));
         columns.add(new TColumn("default_value", Type.STRING.toThrift()));
         columns.add(new TColumn("encoding", Type.STRING.toThrift()));
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 5d65830c1..901bf3da8 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -123,14 +123,18 @@ public class KuduCatalogOpExecutor {
     }
   }
 
-  private static ColumnSchema createColumnSchema(TColumn column, boolean isKey)
-      throws ImpalaRuntimeException {
+  private static ColumnSchema createColumnSchema(TColumn column, boolean isKey,
+      boolean isKeyUnique) throws ImpalaRuntimeException {
     Type type = Type.fromThrift(column.getColumnType());
     Preconditions.checkState(type != null);
     org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
 
     ColumnSchemaBuilder csb = new ColumnSchemaBuilder(column.getColumnName(), kuduType);
-    csb.key(isKey);
+    if (isKey && !isKeyUnique) {
+      csb.nonUniqueKey(true);
+    } else {
+      csb.key(isKey);
+    }
     if (column.isSetIs_nullable()) {
       // If nullability is explicitly set and the column is a key, it must have been
       // set as NOT NULL. This is the default, but it is also valid to specify it.
@@ -182,8 +186,9 @@ public class KuduCatalogOpExecutor {
 
     if (!leadingColNames.equals(keyColNames)) {
       throw new ImpalaRuntimeException(String.format(
-          "Kudu PRIMARY KEY columns must be specified as the first columns " +
+          "Kudu %s columns must be specified as the first columns " +
           "in the table (expected leading columns (%s) but found (%s))",
+          KuduUtil.getPrimaryKeyString(params.is_primary_key_unique),
           PrintUtils.joinQuoted(keyColNames),
           PrintUtils.joinQuoted(leadingColNames)));
     }
@@ -191,7 +196,8 @@ public class KuduCatalogOpExecutor {
     List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize());
     for (TColumn column: params.getColumns()) {
       boolean isKey = colSchemas.size() < keyColNames.size();
-      colSchemas.add(createColumnSchema(column, isKey));
+      boolean isKeyUnique = isKey ? params.is_primary_key_unique : false;
+      colSchemas.add(createColumnSchema(column, isKey, isKeyUnique));
     }
     return new Schema(colSchemas);
   }
@@ -503,7 +509,7 @@ public class KuduCatalogOpExecutor {
       throws ImpalaRuntimeException {
     AlterTableOptions alterTableOptions = new AlterTableOptions();
     for (TColumn column: columns) {
-      alterTableOptions.addColumn(createColumnSchema(column, false));
+      alterTableOptions.addColumn(createColumnSchema(column, false, false));
     }
     String errMsg = "Error adding columns to Kudu table " + tbl.getName();
     alterKuduTable(tbl, alterTableOptions, errMsg);
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index dcaa4da09..c8da69055 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -339,10 +339,13 @@ public class KuduUtil {
   }
 
   public static TColumn setColumnOptions(TColumn column, boolean isKey,
-      Boolean isNullable, Encoding encoding, CompressionAlgorithm compression,
-      Expr defaultValue, Integer blockSize, String kuduName) {
+      boolean isPrimaryKeyUnique, Boolean isNullable, boolean isAutoIncrementing,
+      Encoding encoding, CompressionAlgorithm compression, Expr defaultValue,
+      Integer blockSize, String kuduName) {
     column.setIs_key(isKey);
+    column.setIs_primary_key_unique(isPrimaryKeyUnique);
     if (isNullable != null) column.setIs_nullable(isNullable);
+    column.setIs_auto_incrementing(isAutoIncrementing);
     try {
       if (encoding != null) column.setEncoding(toThrift(encoding));
       if (compression != null) column.setCompression(toThrift(compression));
@@ -482,4 +485,12 @@ public class KuduUtil {
   public static int getkuduClientsSize() {
     return kuduClients_.size();
   }
+
+  // Used for generating log messages
+  public static String getPrimaryKeyString(boolean isPrimaryKeyUnique) {
+    StringBuilder sb = new StringBuilder();
+    if (!isPrimaryKeyUnique) sb.append("NON UNIQUE ");
+    sb.append("PRIMARY KEY");
+    return sb.toString();
+  }
 }
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 8619cf324..6a96ef100 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -188,6 +188,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("merge_fn", SqlParserSymbols.KW_MERGE_FN);
     keywordMap.put("metadata", SqlParserSymbols.KW_METADATA);
     keywordMap.put("minus", SqlParserSymbols.KW_MINUS);
+    keywordMap.put("non", SqlParserSymbols.KW_NON);
     keywordMap.put("norely", SqlParserSymbols.KW_NORELY);
     keywordMap.put("not", SqlParserSymbols.KW_NOT);
     keywordMap.put("novalidate", SqlParserSymbols.KW_NOVALIDATE);
@@ -274,6 +275,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("unbounded", SqlParserSymbols.KW_UNBOUNDED);
     keywordMap.put("uncached", SqlParserSymbols.KW_UNCACHED);
     keywordMap.put("union", SqlParserSymbols.KW_UNION);
+    keywordMap.put("unique", SqlParserSymbols.KW_UNIQUE);
     keywordMap.put("unknown", SqlParserSymbols.KW_UNKNOWN);
     keywordMap.put("unnest", SqlParserSymbols.KW_UNNEST);
     keywordMap.put("unset", SqlParserSymbols.KW_UNSET);
@@ -393,7 +395,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
         "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time",
         "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute",
         "trailing", "translate", "translate_regex", "translation", "treat", "trigger",
-        "trim", "trim_array", "uescape", "unique", "unknown", "update  ",
+        "trim", "trim_array", "uescape", "unknown", "update  ",
         "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary",
         "varying", "versioning", "whenever", "width_bucket", "window", "within",
         "without", "year"}));
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 457e2dded..30f541c03 100755
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -442,8 +442,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Cannot ALTER ADD COLUMN primary key on Kudu table.
     AnalysisError("alter table functional_kudu.alltypes add column " +
         "new_col int primary key",
-        "Cannot add a primary key using an ALTER TABLE ADD COLUMNS statement: " +
+        "Cannot add a PRIMARY KEY using an ALTER TABLE ADD COLUMNS statement: " +
         "new_col INT PRIMARY KEY");
+    AnalysisError("alter table functional_kudu.alltypes add column " +
+        "new_col int non unique primary key",
+        "Cannot add a NON UNIQUE PRIMARY KEY using an ALTER TABLE ADD COLUMNS " +
+        "statement: new_col INT NON UNIQUE PRIMARY KEY");
 
     // A non-null column must have a default on Kudu table.
     AnalysisError("alter table functional_kudu.alltypes add column new_col int not null",
@@ -518,8 +522,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     // Cannot ALTER ADD COLUMNS primary key on Kudu table.
     AnalysisError("alter table functional_kudu.alltypes add columns " +
         "(new_col int primary key)",
-        "Cannot add a primary key using an ALTER TABLE ADD COLUMNS statement: " +
+        "Cannot add a PRIMARY KEY using an ALTER TABLE ADD COLUMNS statement: " +
         "new_col INT PRIMARY KEY");
+    AnalysisError("alter table functional_kudu.alltypes add columns " +
+        "(new_col int non unique primary key)",
+        "Cannot add a NON UNIQUE PRIMARY KEY using an ALTER TABLE ADD COLUMNS " +
+        "statement: new_col INT NON UNIQUE PRIMARY KEY");
 
     // A non-null column must have a default on Kudu table.
     AnalysisError("alter table functional_kudu.alltypes add columns" +
@@ -2295,6 +2303,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
         "bigint_col, float_col, double_col, date_string_col, string_col " +
         "from functional.alltypestiny");
+    AnalyzesOk("create table t non unique primary key (id) partition by hash (id) " +
+        "partitions 3 stored as kudu as select id, bool_col, tinyint_col, " +
+        "smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, " +
+        "string_col from functional.alltypestiny");
     AnalyzesOk("create table t primary key (id) partition by range (id) " +
         "(partition values < 10, partition 20 <= values < 30, partition value = 50) " +
         "stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
@@ -2349,6 +2361,21 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         " stored as kudu as SELECT INT_COL, SMALLINT_COL, ID, BIGINT_COL," +
         " DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH FROM " +
         " functional.alltypes");
+    AnalyzesOk("create table part_kudu_tbl non unique primary key(INT_COL, " +
+        "SMALLINT_COL, ID) partition by hash(INT_COL, SMALLINT_COL, ID) " +
+        "PARTITIONS 2 stored as kudu as SELECT INT_COL, SMALLINT_COL, ID, " +
+        "BIGINT_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, " +
+        "MONTH FROM functional.alltypes");
+    AnalyzesOk("create table part_kudu_tbl non unique primary key(INT_COL, " +
+        "SMALLINT_COL, ID, BIGINT_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, " +
+        "YEAR, MONTH) partition by hash(INT_COL, SMALLINT_COL, ID) " +
+        "PARTITIONS 2 stored as kudu as SELECT INT_COL, SMALLINT_COL, ID, " +
+        "BIGINT_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, " +
+        "MONTH FROM functional.alltypes");
+    AnalyzesOk("create table no_part_kudu_tbl non unique primary key(INT_COL, " +
+        "SMALLINT_COL, ID) stored as kudu as SELECT INT_COL, SMALLINT_COL, ID, " +
+        "BIGINT_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, " +
+        "MONTH FROM functional.alltypes");
 
     // IMPALA-7679: Inserting a null column type without an explicit type should
     // throw an error.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
index a75eb581e..5f90441d7 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeKuduDDLTest.java
@@ -84,6 +84,32 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "partition by hash(y) partitions 8 stored as kudu", isExternalPurgeTbl);
     AnalyzesOk("create table tab (x timestamp, y timestamp, primary key(x)) " +
         "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
+    // Test non unique primary key
+    AnalyzesOk("create table tab (x int non unique primary key) partition by hash(x) " +
+        "partitions 8 stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, non unique primary key(x)) " +
+        "partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, y int, non unique primary key (x, y)) " +
+        "partition by hash(x, y) partitions 8 stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, y int, non unique primary key (x)) " +
+        "partition by range (partition values < 10, partition 10 <= values < 30, " +
+        "partition 30 <= values) stored as kudu tblproperties(" +
+        "'kudu.num_tablet_replicas' = '3')", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, y int, non unique primary key(x, y)) " +
+        "stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x timestamp, y timestamp, non unique primary key(x))" +
+        " partition by hash(x) partitions 8 stored as kudu", isExternalPurgeTbl);
+    // Promote all partition columns as non unique primary key columns if primary keys
+    // are not declared, but partition columns must be the first columns in the table.
+    AnalyzesOk("create table tab (x int, y int) partition by hash(x) partitions 8 " +
+        "stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, y int) partition by hash(x, y) partitions 8 " +
+        "stored as kudu", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int, y int) partition by hash(y) partitions 8 " +
+        "stored as kudu", "Specify primary key or non unique primary key for the Kudu " +
+        "table, or create partitions with the beginning columns of the table.",
+        isExternalPurgeTbl);
+
     AnalyzesOk("create table tab (x int, y string, primary key (x)) partition by " +
         "hash (x) partitions 3, range (x) (partition values < 1, partition " +
         "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " +
@@ -131,6 +157,8 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
     // Key column in upper case
     AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
         "partition by hash (x) partitions 8 stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int, y int, non unique primary key (X)) " +
+        "partition by hash (x) partitions 8 stored as kudu", isExternalPurgeTbl);
     // Flexible Partitioning
     AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
         "partition by hash (a, b) partitions 8, hash(c) partitions 2 stored as " +
@@ -164,19 +192,47 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
     AnalysisError("create table tab (x int, y int, primary key (z)) " +
         "partition by hash (x) partitions 8 stored as kudu",
         "PRIMARY KEY column 'z' does not exist in the table", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int, y int, non unique primary key (z)) " +
+        "partition by hash (x) partitions 8 stored as kudu",
+        "NON UNIQUE PRIMARY KEY column 'z' does not exist in the table",
+        isExternalPurgeTbl);
     // Invalid composite primary key
     AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
-        "as kudu", "Multiple primary keys specified. Composite primary keys can " +
+        "as kudu", "Multiple PRIMARY KEYS specified. Composite PRIMARY KEY can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
         "of the column definition.", isExternalPurgeTbl);
     AnalysisError("create table tab (x int primary key, y int primary key) stored " +
-        "as kudu", "Multiple primary keys specified. Composite primary keys can " +
+        "as kudu", "Multiple PRIMARY KEYS specified. Composite PRIMARY KEY can " +
         "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
         "of the column definition.", isExternalPurgeTbl);
+    // Invalid composite non unique primary key
+    AnalysisError("create table tab (x int non unique primary key, " +
+        "non unique primary key(x)) stored as kudu",
+        "Multiple NON UNIQUE PRIMARY KEYS specified. Composite NON UNIQUE PRIMARY KEY " +
+        "can be specified using the NON UNIQUE PRIMARY KEY (col1, col2, ...) syntax at " +
+        "the end of the column definition.", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int non unique primary key, " +
+        "y int non unique primary key) stored as kudu",
+        "Multiple NON UNIQUE PRIMARY KEYS specified. Composite NON UNIQUE PRIMARY KEY " +
+        "can be specified using the NON UNIQUE PRIMARY KEY (col1, col2, ...) syntax at " +
+        "the end of the column definition.", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int non unique primary key, " +
+        "y int primary key) stored as kudu",
+        "Multiple NON UNIQUE PRIMARY KEYS specified. Composite NON UNIQUE PRIMARY KEY " +
+        "can be specified using the NON UNIQUE PRIMARY KEY (col1, col2, ...) syntax at " +
+        "the end of the column definition.", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int primary key, y int non unique primary key) " +
+        "stored as kudu", "Multiple PRIMARY KEYS specified. Composite PRIMARY KEY " +
+        "can be specified using the PRIMARY KEY (col1, col2, ...) syntax at " +
+        "the end of the column definition.", isExternalPurgeTbl);
     // Specifying the same primary key column multiple times
     AnalysisError("create table tab (x int, primary key (x, x)) partition by hash (x) " +
         "partitions 8 stored as kudu",
         "Column 'x' is listed multiple times as a PRIMARY KEY.", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int, non unique primary key (x, x)) " +
+        "partition by hash (x) partitions 8 stored as kudu",
+        "Column 'x' is listed multiple times as a NON UNIQUE PRIMARY KEY.",
+        isExternalPurgeTbl);
     // Number of range partition boundary values should be equal to the number of range
     // columns.
     AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
@@ -334,7 +390,17 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
                   "%s encoding %s compression %s %s %s) partition by hash (x) " +
                   "partitions 3 stored as kudu", nul, enc, comp, def, block);
               if (nul.equals("null")) {
-                AnalysisError(createTblStr, "Primary key columns cannot be nullable",
+                AnalysisError(createTblStr, "PRIMARY KEY columns cannot be nullable",
+                    isExternalPurgeTbl);
+              } else {
+                AnalyzesOk(createTblStr, isExternalPurgeTbl);
+              }
+              createTblStr = String.format("create table tab (x int non unique primary " +
+                  "key %s encoding %s compression %s %s %s) partition by hash (x) " +
+                  "partitions 3 stored as kudu", nul, enc, comp, def, block);
+              if (nul.equals("null")) {
+                AnalysisError(
+                    createTblStr, "NON UNIQUE PRIMARY KEY columns cannot be nullable",
                     isExternalPurgeTbl);
               } else {
                 AnalyzesOk(createTblStr, isExternalPurgeTbl);
@@ -360,13 +426,25 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "compression snappy block_size 1, y int null encoding rle compression lz4 " +
         "default 1, primary key(x)) partition by hash (x) partitions 3 " +
         "stored as kudu", isExternalPurgeTbl);
+    AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
+        "compression snappy block_size 1, y int null encoding rle compression lz4 " +
+        "default 1, non unique primary key(x)) partition by hash (x) partitions 3 " +
+        "stored as kudu", isExternalPurgeTbl);
     // Primary keys can't be null
     AnalysisError("create table tab (x int primary key null, y int not null) " +
-        "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
+        "partition by hash (x) partitions 3 stored as kudu", "PRIMARY KEY columns " +
         "cannot be nullable: x INT PRIMARY KEY NULL", isExternalPurgeTbl);
     AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
-        "partition by hash (x) partitions 3 stored as kudu", "Primary key columns " +
+        "partition by hash (x) partitions 3 stored as kudu", "PRIMARY KEY columns " +
         "cannot be nullable: y INT NULL", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int non unique primary key null, " +
+        "y int not null) partition by hash (x) partitions 3 stored as kudu",
+        "NON UNIQUE PRIMARY KEY columns cannot be nullable: " +
+        "x INT NON UNIQUE PRIMARY KEY NULL", isExternalPurgeTbl);
+    AnalysisError("create table tab (x int not null, y int null, " +
+        "non unique primary key (x, y)) partition by hash (x) partitions 3 " +
+        "stored as kudu", "NON UNIQUE PRIMARY KEY columns cannot be nullable: " +
+        "y INT NULL", isExternalPurgeTbl);
     // Unsupported encoding value
     AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
         "partition by hash (x) partitions 3 stored as kudu", "Unsupported encoding " +
@@ -635,8 +713,11 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
         "a STRUCT<f1:INT>");
     // Add primary key
     AnalysisError("alter table functional_kudu.testtbl add columns (a int primary key)",
-        "Cannot add a primary key using an ALTER TABLE ADD COLUMNS statement: " +
+        "Cannot add a PRIMARY KEY using an ALTER TABLE ADD COLUMNS statement: " +
         "a INT PRIMARY KEY");
+    AnalysisError("alter table functional_kudu.testtbl add columns (a int non unique " +
+        "primary key)", "Cannot add a NON UNIQUE PRIMARY KEY using an ALTER TABLE ADD " +
+        "COLUMNS statement: a INT NON UNIQUE PRIMARY KEY");
     // Columns requiring a default value
     AnalyzesOk("alter table functional_kudu.testtbl add columns (a1 int not null " +
         "default 10)");
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 83630c144..3da74e15b 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2794,10 +2794,19 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("CREATE TABLE foo (i INT, j INT, PRIMARY KEY (j, i)) STORED AS KUDU");
     ParsesOk("CREATE TABLE foo (i INT PRIMARY KEY, PRIMARY KEY(i)) STORED AS KUDU");
     ParsesOk("CREATE TABLE foo (i INT PRIMARY KEY, j INT PRIMARY KEY) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT NON UNIQUE PRIMARY KEY) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT NON UNIQUE PRIMARY KEY, "
+        + "NON UNIQUE PRIMARY KEY(i)) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT, j INT, NON UNIQUE PRIMARY KEY (i, j)) "
+        + "STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT NON UNIQUE PRIMARY KEY, "
+        + "j INT NON UNIQUE PRIMARY KEY) STORED AS KUDU");
     ParserError("CREATE TABLE foo (i INT) PRIMARY KEY (i) STORED AS KUDU");
     ParserError("CREATE TABLE foo (i INT, PRIMARY KEY) STORED AS KUDU");
     ParserError("CREATE TABLE foo (PRIMARY KEY(a), a INT) STORED AS KUDU");
-    ParserError("CREATE TABLE foo (i INT) PRIMARY KEY (i) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (i INT) NON UNIQUE PRIMARY KEY (i) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (i INT, NON UNIQUE PRIMARY KEY) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (NON UNIQUE PRIMARY KEY(a), a INT) STORED AS KUDU");
 
     // Supported storage engines
     ParsesOk("CREATE TABLE foo (i INT) STORED BY KUDU");
@@ -3044,6 +3053,8 @@ public class ParserTest extends FrontendTestBase {
                   "%s %s %s %s %s) STORED AS KUDU", enc, comp, def, block, nul));
               ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
                   "%s %s %s %s %s) STORED AS KUDU", enc, comp, block, def, nul));
+              ParsesOk(String.format("CREATE TABLE Foo (i int NON UNIQUE PRIMARY KEY " +
+                  "%s %s %s %s %s) STORED AS KUDU", nul, enc, comp, def, block));
             }
           }
         }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
index 286be0d4b..5f3d7c4a3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
@@ -170,4 +170,85 @@ select count(*) from functional_kudu.alltypes where rand() + id < 0.0;
 0
 ---- TYPES
 BIGINT
+====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table non_unique_key_scan_tbl1 non unique primary key (id)
+partition by range (id)
+(partition value = 0, partition value = 1,
+ partition value = 2, partition value = 3,
+ partition value = 4, partition value = 5,
+ partition value = 6, partition value = 7)
+stored as kudu
+as select id, int_col from functional.alltypestiny;
+---- RESULTS
+'Inserted 8 row(s)'
+====
+---- QUERY
+# auto-incrementing column is not shown for SELECT *
+select * from non_unique_key_scan_tbl1 order by id asc;
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# auto-incrementing column is shown when the column is specified in SELECT statement
+select id, int_col, auto_incrementing_id from non_unique_key_scan_tbl1 order by id asc,
+auto_incrementing_id desc;
+---- RESULTS
+0,0,1
+1,1,1
+2,0,1
+3,1,1
+4,0,1
+5,1,1
+6,0,1
+7,1,1
+---- TYPES
+INT,INT,BIGINT
+====
+---- QUERY
+# Query with auto-incrementing column in where clause
+select id, int_col, auto_incrementing_id from non_unique_key_scan_tbl1
+where auto_incrementing_id = 1 and id < 3
+group by id, int_col, auto_incrementing_id;
+---- RESULTS
+0,0,1
+1,1,1
+2,0,1
+---- TYPES
+INT,INT,BIGINT
+====
+---- QUERY
+# Create unpartitioned Kudu table with non unique primary key.
+create table non_unique_key_scan_tbl2 non unique primary key (id)
+stored as kudu
+as select id, int_col from functional.alltypestiny order by id asc limit 100;
+---- RESULTS
+'Inserted 8 row(s)'
+====
+---- QUERY
+# Query with auto-incrementing column in ORDER BY.
+# All rows are added to one tablet-server so auto_incrementing_id shows insertion order.
+select id, int_col, auto_incrementing_id from non_unique_key_scan_tbl2
+order by auto_incrementing_id asc;
+---- RESULTS
+0,0,1
+1,1,2
+2,0,3
+3,1,4
+4,0,5
+5,1,6
+6,0,7
+7,1,8
+---- TYPES
+INT,INT,BIGINT
 ====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
index 162e7684f..63e5c6a45 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
@@ -347,6 +347,12 @@ alter table tbl_to_alter add columns (invalid_col int not null)
 A new non-null column must have a default value
 ====
 ---- QUERY
+# Add a column with name reserved by Kudu engine
+alter table tbl_to_alter add columns (auto_incrementing_id bigint)
+---- CATCH
+Column name auto_incrementing_id is reserved by Kudu engine
+====
+---- QUERY
 # Drop a column
 alter table tbl_to_alter drop column vali
 ---- RESULTS
@@ -577,16 +583,16 @@ alter table kudu_tbl_to_alter alter column new_col4
   set encoding plain_encoding compression lz4 block_size 1000;
 describe kudu_tbl_to_alter;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
----- RESULTS
-'id','int','','true','false','','RLE','SNAPPY','100'
-'last_name','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'name','string','','false','true','name_default','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col1','int','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col2','bigint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col4','int','','false','true','-1','PLAIN_ENCODING','LZ4','1000'
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'id','int','','true','true','false','','RLE','SNAPPY','100'
+'last_name','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'name','string','','false','','true','name_default','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col1','int','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col2','bigint','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col4','int','','false','','true','-1','PLAIN_ENCODING','LZ4','1000'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # check that we can insert and scan after the storage attribute changes
@@ -664,4 +670,57 @@ describe formatted external_tbl_new;
 '','kudu.table_name     ','impala::$DATABASE.temp_kudu_table'
 ---- TYPES
 STRING,STRING,STRING
-----
\ No newline at end of file
+====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table alter_non_unique_key_test (id int non unique primary key, name string)
+partition by hash (id) partitions 3
+stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+alter table alter_non_unique_key_test add columns (new_col1 int not null default 10)
+---- RESULTS
+'Column(s) have been added.'
+====
+---- QUERY
+alter table alter_non_unique_key_test add columns (auto_incrementing_id bigint not null default 10)
+---- CATCH
+AnalysisException: Column already exists: auto_incrementing_id
+====
+---- QUERY
+alter table alter_non_unique_key_test drop column new_col1
+---- RESULTS
+'Column has been dropped.'
+====
+---- QUERY
+alter table alter_non_unique_key_test drop column id
+---- CATCH
+NonRecoverableException: cannot remove a key column: id
+====
+---- QUERY
+alter table alter_non_unique_key_test alter column id set default 10
+---- CATCH
+AnalysisException: Cannot set default value for primary key column 'id'
+====
+---- QUERY
+alter table alter_non_unique_key_test alter column id set block_size 100
+---- RESULTS
+'Column has been altered.'
+====
+---- QUERY
+alter table alter_non_unique_key_test drop column auto_incrementing_id
+---- CATCH
+IllegalArgumentException: Cannot remove auto-incrementing column auto_incrementing_id
+====
+---- QUERY
+alter table alter_non_unique_key_test alter column auto_incrementing_id set default 10
+---- CATCH
+AnalysisException: Cannot set default value for system generated primary key column 'auto_incrementing_id'
+====
+---- QUERY
+alter table alter_non_unique_key_test alter column auto_incrementing_id set block_size 1
+---- RESULTS
+'Column has been altered.'
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
index 7c151559f..add08cf85 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
@@ -482,4 +482,165 @@ describe formatted kudu_stored_by;
 '','storage_handler     ','org.apache.hadoop.hive.kudu.KuduStorageHandler'
 ---- TYPES
 string, string, string
+====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table non_unique_key_create_tbl1 (id int non unique primary key, name string)
+partition by hash (id) partitions 3
+stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into non_unique_key_create_tbl1 values (1,'Martin'), (2,'Smith');
+---- RESULTS
+: 2
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+# auto-incrementing column is not shown for "select *"
+select * from non_unique_key_create_tbl1 where name = 'Martin';
+---- RESULTS
+1,'Martin'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Create Kudu table with non unique composite primary key
+create table non_unique_key_create_tbl2 (a int, b string, non unique primary key(a, b))
+partition by hash (a) partitions 3
+stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create Kudu table with non unique composite primary key
+create table non_unique_key_create_tbl3 (a string, b int, non unique primary key(a, b))
+partition by hash (a) partitions 3
+stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create Kudu table without primary key columns,
+# partition columns will be promoted as non unique primary key columns.
+create table promote_partition_keys_as_non_unique_keys_test (a int, b string, c float)
+partition by hash (a, b) partitions 3
+stored as kudu;
+---- RESULTS
+'Table has been created.'
+---- ERRORS
+Partition columns (a, b) are promoted as non unique primary key.
+====
+---- QUERY
+# Create Kudu table without primary key columns,
+# partition columns cannot be promoted as non unique primary key columns since the columns
+# are not beginning columns of the table.
+create table partition_keys_not_promoted_test (a int, b string, c float)
+partition by hash (b, c) partitions 3
+stored as kudu;
+---- CATCH
+AnalysisException: Specify primary key or non unique primary key for the Kudu table, or create partitions with the beginning columns of the table.
+====
+---- QUERY
+# Create unpartitioned Kudu table with non unique primary key column.
+create table non_unique_key_create_tbl4 (a int non unique primary key, b string)
+stored as kudu;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create Kudu table in CTAS statement with non unique primary key
+create table non_unique_key_create_tbl5 non unique primary key (id)
+partition by hash (id) partitions 3
+stored as kudu
+as select id, int_col from functional.alltypestiny;
+select * from non_unique_key_create_tbl5 order by id asc;
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Create Kudu table in CTAS statement with non unique primary key and range partitions
+create table non_unique_key_create_tbl6 non unique primary key (id)
+partition by range (id) (partition values <= 1, partition 1 < values <= 3,
+  partition 3 < values <= 5, partition 5 < values)
+stored as kudu
+as select id, int_col from functional.alltypestiny order by id asc limit 100;
+select id, int_col, auto_incrementing_id from non_unique_key_create_tbl6 order by id asc;
+---- RESULTS
+0,0,1
+1,1,2
+2,0,1
+3,1,2
+4,0,1
+5,1,2
+6,0,1
+7,1,2
+---- TYPES
+INT,INT,BIGINT
+====
+---- QUERY
+# Non unique primary key for non Kudu table
+create table non_unique_key_create_tbl7 (x int, y boolean, non unique primary key(x, y))
+---- CATCH
+AnalysisException: NON UNIQUE PRIMARY KEY is only supported for Kudu
+====
+---- QUERY
+# Non unique primary keys should be declared first
+create table non_unique_key_create_tbl8 (x int, y int, non unique primary key(y))
+  partition by hash (y) partitions 3 stored as kudu
+---- CATCH
+ImpalaRuntimeException: Kudu NON UNIQUE PRIMARY KEY columns must be specified as the first columns in the table (expected leading columns ('y') but found ('x'))
+====
+---- QUERY
+# Non unique primary keys should be be listed in the same order
+create table non_unique_key_create_tbl9 (x int, y int, z int, non unique primary key(y,x))
+  partition by hash (y) partitions 3 stored as kudu
+---- CATCH
+ImpalaRuntimeException: Kudu NON UNIQUE PRIMARY KEY columns must be specified as the first columns in the table (expected leading columns ('y', 'x') but found ('x', 'y'))
+====
+---- QUERY
+# Non unique primary key cannot be boolean type
+create table non_unique_key_create_tbl10 (x boolean non unique primary key)
+  partition by hash(x) partitions 8 stored as kudu
+---- CATCH
+NonRecoverableException: key column may not have type of BOOL, FLOAT, or DOUBLE
+====
+---- QUERY
+# Non unique primary key cannot be float type
+create table non_unique_key_create_tbl11 (x float non unique primary key)
+  partition by hash(x) partitions 8 stored as kudu
+---- CATCH
+NonRecoverableException: key column may not have type of BOOL, FLOAT, or DOUBLE
+====
+---- QUERY
+# Non unique primary key cannot be double type
+create table non_unique_key_create_tbl12 (x double non unique primary key)
+  partition by hash(x) partitions 8 stored as kudu
+---- CATCH
+NonRecoverableException: key column may not have type of BOOL, FLOAT, or DOUBLE
+====
+---- QUERY
+# Cannot create a Kudu table without any key and partition
+create table non_unique_key_create_tbl13 (x int) stored as kudu
+---- CATCH
+AnalysisException: A primary key is required for a Kudu table.
+====
+---- QUERY
+# Cannot create a Kudu table with a column named as "auto_incrementing_id"
+create table non_unique_key_create_tbl14 (id int primary key, auto_incrementing_id bigint)
+  partition by hash(id) partitions 3 stored as kudu
+---- CATCH
+IllegalArgumentException: Column name auto_incrementing_id is reserved by Kudu engine
 ====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test
index ceff38d6c..ecbd926d5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test
@@ -410,3 +410,67 @@ select * from impala_3454
 ---- TYPES
 TINYINT,BIGINT
 ====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table delete_non_unique_key_test non unique primary key (id)
+partition by hash (id) partitions 3 stored as kudu
+as select id, int_col from functional.alltypestiny;
+select * from delete_non_unique_key_test order by id;
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a DELETE with non unique primary key
+delete delete_non_unique_key_test where id < 3;
+---- RUNTIME_PROFILE
+NumModifiedRows: 3
+NumRowErrors: 0
+====
+---- QUERY
+select * from delete_non_unique_key_test order by id;
+---- RESULTS
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a DELETE with non key column
+delete delete_non_unique_key_test where int_col = 0;
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+select * from delete_non_unique_key_test order by id;
+---- RESULTS
+3,1
+5,1
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a DELETE with auto-incrementing column in where clause
+delete delete_non_unique_key_test where auto_incrementing_id < 10;
+---- RUNTIME_PROFILE
+NumModifiedRows: 3
+NumRowErrors: 0
+====
+---- QUERY
+select count(*) from delete_non_unique_key_test;
+---- RESULTS
+0
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
index 5d8f0ef55..7b17f76a2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_describe.test
@@ -2,23 +2,23 @@
 ---- QUERY
 describe functional_kudu.alltypes
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'bigint_col','bigint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'bool_col','boolean','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'date_string_col','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'double_col','double','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'float_col','float','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'id','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'int_col','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'month','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'smallint_col','smallint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'string_col','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'timestamp_col','timestamp','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'tinyint_col','tinyint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'year','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'bigint_col','bigint','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'bool_col','boolean','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'date_string_col','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'double_col','double','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'float_col','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'id','int','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'int_col','int','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'month','int','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'smallint_col','smallint','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'string_col','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'timestamp_col','timestamp','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'tinyint_col','tinyint','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'year','int','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # Test composite primary key and column options.
@@ -34,16 +34,16 @@ partition by hash (pk1) partitions 3
 stored as kudu;
 describe describe_test;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'pk1','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'pk2','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'pk3','string','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'c1','string','testing','false','true','abc','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'c2','int','','false','false','100','PLAIN_ENCODING','SNAPPY','0'
-'c3','int','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','8388608'
+'pk1','int','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'pk2','int','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'pk3','string','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c1','string','testing','false','','true','abc','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c2','int','','false','','false','100','PLAIN_ENCODING','SNAPPY','0'
+'c3','int','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','8388608'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # Test decimal columns and primary key
@@ -56,14 +56,14 @@ create table describe_decimal_test
 stored as kudu;
 describe describe_decimal_test;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'decimal_default','decimal(9,0)','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'decimal_4','decimal(9,9)','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'decimal_8','decimal(18,2)','','false','false','100.00','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'decimal_16','decimal(38,0)','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'decimal_default','decimal(9,0)','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'decimal_4','decimal(9,9)','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'decimal_8','decimal(18,2)','','false','','false','100.00','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'decimal_16','decimal(38,0)','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # IMPALA-7781: Test unescaped default column values
@@ -77,15 +77,15 @@ CREATE TABLE IF NOT EXISTS unescaped_str_defaults (
 ) STORED AS KUDU;
 DESCRIBE unescaped_str_defaults;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'id','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'s1','string','','false','true','"','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'s2','string','','false','true','''','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'s3','string','','false','true','\\"','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'s4','string','','false','true','\\''','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'id','int','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'s1','string','','false','','true','"','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'s2','string','','false','','true','''','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'s3','string','','false','','true','\\"','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'s4','string','','false','','true','\\''','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # Test date columns and primary key
@@ -97,13 +97,13 @@ create table describe_date_test
 stored as kudu;
 describe describe_date_test;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'date_pk','date','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'date_val','date','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'date_null','date','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'date_pk','date','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'date_val','date','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'date_null','date','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # Test varchar columns and primary key
@@ -116,12 +116,100 @@ create table describe_varchar_test
 stored as kudu;
 describe describe_varchar_test;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
 ---- RESULTS
-'varchar_pk','varchar(1000)','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'varchar_val','varchar(500)','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'varchar_default','varchar(200)','','false','false','foo','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'varchar_null','varchar(100)','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'varchar_pk','varchar(1000)','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'varchar_val','varchar(500)','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'varchar_default','varchar(200)','','false','','false','foo','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'varchar_null','varchar(100)','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
+---- QUERY
+# Create Kudu table with non unique primary key.
+# Verify that auto_incrementing_id column is added by Kudu.
+create table describe_non_unique_key_test (key int non unique primary key, name string)
+  partition by hash (key) partitions 3
+  stored as kudu;
+describe describe_non_unique_key_test;
+---- LABELS
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'key','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'name','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create Kudu table with non unique composite primary keys.
+# Verify that auto_incrementing_id column is added by Kudu.
+create table describe_non_unique_composite_key_test
+  (a int, b string, c float, non unique primary key(a, b))
+  partition by hash (a) partitions 3
+  stored as kudu;
+describe describe_non_unique_composite_key_test;
+---- LABELS
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'a','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'b','string','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create Kudu table in CTAS statement with non unique primary key.
+# Verify that auto_incrementing_id column is added by Kudu.
+create table describe_ctas_non_unique_key_test non unique primary key (id, int_col)
+  partition by hash (id) partitions 3 stored as kudu
+  as select id, int_col, float_col, string_col from functional.alltypestiny;
+describe describe_ctas_non_unique_key_test;
+---- LABELS
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'id','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'int_col','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'float_col','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'string_col','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create Kudu table without primary key columns.
+# Verify that partition columns 'a' and 'b' are promoted as non unique key columns and
+# auto_incrementing_id column is added by Kudu.
+create table describe_promote_partition_keys_as_non_unique_key_test
+  (a int, b string, c float)
+  partition by hash (a, b) partitions 3
+  stored as kudu;
+describe describe_promote_partition_keys_as_non_unique_key_test;
+---- LABELS
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'a','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'b','string','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create Kudu table with non unique composite primary keys, but without partitions.
+# Verify that auto_incrementing_id column is added by Kudu.
+create table describe_non_unique_key_no_partitions_test
+  (a int, b string, c float, non unique primary key(a, b))
+  stored as kudu;
+describe describe_non_unique_key_no_partitions_test;
+---- LABELS
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'a','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'b','string','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'c','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_hms_alter.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_hms_alter.test
index d29f592f0..8d1cdc323 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_hms_alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_hms_alter.test
@@ -567,16 +567,16 @@ alter table kudu_tbl_to_alter alter column new_col4
   set encoding plain_encoding compression lz4 block_size 1000;
 describe kudu_tbl_to_alter;
 ---- LABELS
-NAME,TYPE,COMMENT,PRIMARY_KEY,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
----- RESULTS
-'id','int','','true','false','','RLE','SNAPPY','100'
-'last_name','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'name','string','','false','true','name_default','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col1','int','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col2','bigint','','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'new_col4','int','','false','true','-1','PLAIN_ENCODING','LZ4','1000'
+NAME,TYPE,COMMENT,PRIMARY_KEY,KEY_UNIQUE,NULLABLE,DEFAULT_VALUE,ENCODING,COMPRESSION,BLOCK_SIZE
+---- RESULTS
+'id','int','','true','true','false','','RLE','SNAPPY','100'
+'last_name','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'name','string','','false','','true','name_default','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col1','int','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col2','bigint','','false','','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'new_col4','int','','false','','true','-1','PLAIN_ENCODING','LZ4','1000'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
 ---- QUERY
 # check that we can insert and scan after the storage attribute changes
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 3e1a42599..18423b744 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -534,3 +534,149 @@ select o_orderkey from tpch_kudu.orders;
 NumModifiedRows: 1500000
 NumRowErrors: 0
 ====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table insert_non_unique_key_test_tbl1
+  (id int non unique primary key, vali bigint null, valv string null)
+  PARTITION BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30,
+  PARTITION 30 <= VALUES) STORED AS KUDU
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Insert VALUES with single row
+insert into insert_non_unique_key_test_tbl1 values (1, 1, 'one')
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 0
+---- LABELS
+ID, VALI, VALV
+---- DML_RESULTS: insert_non_unique_key_test_tbl1
+1,1,'one'
+---- TYPES
+INT,BIGINT,STRING
+====
+---- QUERY
+# Insert VALUES with multiple rows
+insert into insert_non_unique_key_test_tbl1
+(id, vali, valv) values (2, 2, 'two'), (3, 3, 'three')
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+# Try to insert a row with value for auto_incrementing_id column
+insert into insert_non_unique_key_test_tbl1
+(id, auto_incrementing_id, vali, valv) values (4, 4, 4, 'four')
+---- CATCH
+auto-incrementing column is incorrectly set
+====
+---- QUERY
+# Try to insert a row with value for auto_incrementing_id column
+insert into insert_non_unique_key_test_tbl1
+(id, auto_incrementing_id) values (5, 50000)
+---- CATCH
+auto-incrementing column is incorrectly set
+====
+---- QUERY
+# Try to insert a row without value for key column
+insert into insert_non_unique_key_test_tbl1
+(vali, valv) values (6, 'six')
+---- CATCH
+AnalysisException: All primary key columns must be specified for INSERTing into Kudu tables. Missing columns are: id
+====
+---- QUERY
+# Insert VALUES with SELECT from other table
+insert into insert_non_unique_key_test_tbl1
+select id, bigint_col, string_col from functional.alltypes where id = 10
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 0
+====
+---- QUERY
+select * from insert_non_unique_key_test_tbl1 order by id;
+---- RESULTS
+1,1,'one'
+2,2,'two'
+3,3,'three'
+10,0,'0'
+---- TYPES
+INT,BIGINT,STRING
+====
+---- QUERY
+# Create Kudu table in CTAS statement with non unique primary key
+create table insert_non_unique_key_test_tbl2 non unique primary key (id)
+  partition by hash (id) partitions 3
+  stored as kudu
+  as select * from insert_non_unique_key_test_tbl1;
+select * from insert_non_unique_key_test_tbl2 order by id;
+---- RESULTS
+1,1,'one'
+2,2,'two'
+3,3,'three'
+10,0,'0'
+---- TYPES
+INT,BIGINT,STRING
+====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table insert_non_unique_key_test_tbl3
+  (id int non unique primary key, vali bigint null, valv string null)
+  partition by hash (id) partitions 3 stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Insert VALUES with SELECT from other table
+insert into insert_non_unique_key_test_tbl3
+  select * from insert_non_unique_key_test_tbl2
+---- RUNTIME_PROFILE
+NumModifiedRows: 4
+NumRowErrors: 0
+====
+---- QUERY
+select * from insert_non_unique_key_test_tbl3 order by id;
+---- RESULTS
+1,1,'one'
+2,2,'two'
+3,3,'three'
+10,0,'0'
+---- TYPES
+INT,BIGINT,STRING
+====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table insert_non_unique_key_test_tbl4
+  (id int non unique primary key, vali bigint null, valv string null)
+  partition by range (id)
+    (partition value = 0, partition value = 1,
+     partition value = 2, partition value = 3)
+  stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Insert two rows
+insert into insert_non_unique_key_test_tbl4 values (1, 1, 'one'), (2, 2, 'two');
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+# Insert two rows with duplicated values for non unique primary key
+insert into insert_non_unique_key_test_tbl4 values (1, 10, 'ten'), (1, 11, 'eleven');
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+select id, vali, valv, auto_incrementing_id from insert_non_unique_key_test_tbl4
+  order by id, auto_incrementing_id;
+---- RESULTS
+1,1,'one',1
+1,10,'ten',2
+1,11,'eleven',3
+2,2,'two',1
+---- TYPES
+INT,BIGINT,STRING,BIGINT
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
index 71bcac72f..d438c0402 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
@@ -12,14 +12,14 @@ show partitions simple_hash
 ---- LABELS
 Start Key,Stop Key,Leader Replica,#Replicas
 ---- RESULTS
-'','0000000000000001',regex:.*?:\d+,3
-'0000000000000001','00000001',regex:.*?:\d+,3
-'00000001','0000000100000001',regex:.*?:\d+,3
-'0000000100000001','00000002',regex:.*?:\d+,3
-'00000002','0000000200000001',regex:.*?:\d+,3
-'0000000200000001','00000003',regex:.*?:\d+,3
-'00000003','0000000300000001',regex:.*?:\d+,3
-'0000000300000001','',regex:.*?:\d+,3
+'0000000000000000','0000000000000001',regex:.*?:\d+,3
+'0000000000000001','0000000000000002',regex:.*?:\d+,3
+'0000000100000000','0000000100000001',regex:.*?:\d+,3
+'0000000100000001','0000000100000002',regex:.*?:\d+,3
+'0000000200000000','0000000200000001',regex:.*?:\d+,3
+'0000000200000001','0000000200000002',regex:.*?:\d+,3
+'0000000300000000','0000000300000001',regex:.*?:\d+,3
+'0000000300000001','0000000300000002',regex:.*?:\d+,3
 ---- TYPES
 STRING,STRING,STRING,INT
 ====
@@ -196,10 +196,10 @@ show partitions simple_hash_all_columns
 ---- LABELS
 Start Key,Stop Key,Leader Replica,#Replicas
 ---- RESULTS
-'','00000001',regex:.*?:\d+,3
+'00000000','00000001',regex:.*?:\d+,3
 '00000001','00000002',regex:.*?:\d+,3
 '00000002','00000003',regex:.*?:\d+,3
-'00000003','',regex:.*?:\d+,3
+'00000003','00000004',regex:.*?:\d+,3
 ---- TYPES
 STRING,STRING,STRING,INT
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
index f3794a989..caedee0e2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
@@ -45,10 +45,61 @@ Start Key,Stop Key,Leader Replica,#Replicas
 compute stats simple;
 describe simple;
 ---- RESULTS
-'id','int','','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'name','string','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'valf','float','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
-'vali','bigint','','false','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'id','int','','true','true','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'name','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'valf','float','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'vali','bigint','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
 ---- TYPES
-STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
 ====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table non_unique_key_stats_test (a int, b string, non unique primary key(a))
+  partition by range (partition values < 10, partition 10 <= values < 30,
+  partition 30 <= values) stored as kudu tblproperties('kudu.num_tablet_replicas' = '1');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Tests the SHOW TABLE STATS output without stats computed
+show table stats non_unique_key_stats_test;
+---- RESULTS
+-1,3,regex:.*,'KUDU',regex:.*
+---- TYPES
+BIGINT,BIGINT,STRING,STRING,STRING
+---- LABELS
+#Rows,#Partitions,Size,Format,Location
+====
+---- QUERY
+# Tests the SHOW TABLE STATS output after stats computed
+compute stats non_unique_key_stats_test;
+show table stats non_unique_key_stats_test;
+---- RESULTS
+0,3,regex:.*,'KUDU',regex:.*
+---- TYPES
+BIGINT,BIGINT,STRING,STRING,STRING
+---- LABELS
+#Rows,#Partitions,Size,Format,Location
+====
+---- QUERY
+# Tests the SHOW PARTITIONS output
+show partitions non_unique_key_stats_test;
+---- RESULTS
+'','8000000A',regex:.*?:\d+,1
+'8000000A','8000001E',regex:.*?:\d+,1
+'8000001E','',regex:.*?:\d+,1
+---- TYPES
+STRING,STRING,STRING,INT
+---- LABELS
+Start Key,Stop Key,Leader Replica,#Replicas
+====
+---- QUERY
+compute stats non_unique_key_stats_test;
+describe non_unique_key_stats_test;
+---- RESULTS
+'a','int','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'auto_incrementing_id','bigint','','true','false','false','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+'b','string','','false','','true','','AUTO_ENCODING','DEFAULT_COMPRESSION','0'
+---- TYPES
+STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
index fb9100855..2541d1288 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test
@@ -358,3 +358,98 @@ update tdata set vali = -1
 NumModifiedRows: 7300
 NumRowErrors: 0
 ====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table update_non_unique_key_test non unique primary key (id)
+partition by hash (id) partitions 3 stored as kudu
+as select id, int_col from functional.alltypestiny;
+select * from update_non_unique_key_test order by id;
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a UPDATE with non unique primary key
+update update_non_unique_key_test set int_col = -1 where id < 3;
+---- RUNTIME_PROFILE
+NumModifiedRows: 3
+NumRowErrors: 0
+====
+---- QUERY
+select * from update_non_unique_key_test order by id;
+---- RESULTS
+0,-1
+1,-1
+2,-1
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a UPDATE with non key column
+update update_non_unique_key_test set id = -1 where int_col = 1;
+---- CATCH
+AnalysisException: Key column 'id' cannot be updated.
+====
+---- QUERY
+# Test a UPDATE with non key column
+update update_non_unique_key_test set int_col = -2 where int_col = 0;
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumRowErrors: 0
+====
+---- QUERY
+select * from update_non_unique_key_test order by id;
+---- RESULTS
+0,-1
+1,-1
+2,-1
+3,1
+4,-2
+5,1
+6,-2
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Test a UPDATE to update auto_incrementing_id column
+update update_non_unique_key_test set auto_incrementing_id = 100 where id = 1;
+---- CATCH
+AnalysisException: System generated key column 'auto_incrementing_id' cannot be updated.
+====
+---- QUERY
+# Test a UPDATE with auto-incrementing column in where clause
+update update_non_unique_key_test set int_col = 0
+where id = 0 and auto_incrementing_id < 10;
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumRowErrors: 0
+====
+---- QUERY
+select id, int_col from update_non_unique_key_test
+group by id, int_col, auto_incrementing_id order by id;
+---- RESULTS
+0,0
+1,-1
+2,-1
+3,1
+4,-2
+5,1
+6,-2
+7,1
+---- TYPES
+INT,INT
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test
index dd790cfbc..303d065cc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test
@@ -528,3 +528,18 @@ upsert into table multiple_key_cols
 ---- CATCH
 All primary key columns must be specified for UPSERTing into Kudu tables. Missing columns are: bigint_col
 ====
+---- QUERY
+# Create Kudu table with non unique primary key
+create table upsert_non_unique_key_test (id int non unique primary key, name string)
+partition by hash (id) partitions 3
+stored as kudu
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+upsert into upsert_non_unique_key_test
+(id, name) values
+(1, 'one'), (2, 'two'), (3, 'three')
+---- CATCH
+UPSERT is not supported for Kudu tables with auto-incrementing column
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index 9759c331f..d05517061 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -318,7 +318,7 @@ class TestKuduHMSIntegration(CustomKuduTest):
         external_table_name, props))
       cursor.execute("DESCRIBE %s" % (external_table_name))
       assert cursor.fetchall() == \
-             [("a", "int", "", "true", "false", "", "AUTO_ENCODING",
+             [("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
                "DEFAULT_COMPRESSION", "0")]
       # Drop the underlying Kudu table
       kudu_client.delete_table(kudu_table.name)
diff --git a/tests/metadata/test_ddl_base.py b/tests/metadata/test_ddl_base.py
index 63409ea6c..068d34bce 100644
--- a/tests/metadata/test_ddl_base.py
+++ b/tests/metadata/test_ddl_base.py
@@ -123,7 +123,7 @@ class TestDdlBase(ImpalaTestSuite):
     comments = dict()
     for row in result.data:
       cols = row.split('\t')
-      if len(cols) <= 9:
+      if len(cols) <= 10:
         comments[cols[0].rstrip()] = cols[2].rstrip()
     return comments.get(col_name)
 
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 05f67f03f..f9f728900 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -664,8 +664,8 @@ class TestCreateExternalTable(KuduTestSuite):
         table_desc = [[col.strip() if col else col for col in row] for row in cursor]
         # Pytest shows truncated output on failure, so print the details just in case.
         LOG.info(table_desc)
-        assert ["ts", "timestamp", "", "false", "true", "1230768000000000", \
-          "AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc
+        assert ["ts", "timestamp", "", "false", "", "true", "1230768000000000",
+            "AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc
     finally:
       if kudu_client.table_exists(name):
         kudu_client.delete_table(name)
@@ -705,7 +705,7 @@ class TestCreateExternalTable(KuduTestSuite):
       with self.drop_impala_table_after_context(cursor, impala_table_name):
         cursor.execute("DESCRIBE %s" % impala_table_name)
         kudu_schema = kudu_table.schema
-        for i, (col_name, col_type, _, _, _, _, _, _, _) in enumerate(cursor):
+        for i, (col_name, col_type, _, _, _, _, _, _, _, _) in enumerate(cursor):
           kudu_col = kudu_schema[i]
           assert col_name == kudu_col.name
           assert col_type.upper() == \
@@ -776,7 +776,7 @@ class TestCreateExternalTable(KuduTestSuite):
         with self.drop_impala_table_after_context(cursor, impala_table_name):
           cursor.execute("DESCRIBE %s" % impala_table_name)
           assert cursor.fetchall() == \
-              [("a", "bigint", "", "true", "false", "", "AUTO_ENCODING",
+              [("a", "bigint", "", "true", "true", "false", "", "AUTO_ENCODING",
                 "DEFAULT_COMPRESSION", "0")]
 
   @SkipIfKudu.hms_integration_enabled
@@ -1266,7 +1266,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
           impala_table_name, props))
       cursor.execute("DESCRIBE %s" % (impala_table_name))
       assert cursor.fetchall() == \
-          [("a", "int", "", "true", "false", "", "AUTO_ENCODING",
+          [("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
             "DEFAULT_COMPRESSION", "0")]
 
       # Drop the underlying Kudu table and replace it with another Kudu table that has
@@ -1284,9 +1284,9 @@ class TestImpalaKuduIntegration(KuduTestSuite):
         cursor.execute("REFRESH %s" % (impala_table_name))
         cursor.execute("DESCRIBE %s" % (impala_table_name))
         assert cursor.fetchall() == \
-            [("b", "string", "", "true", "false", "", "AUTO_ENCODING",
+            [("b", "string", "", "true", "true", "false", "", "AUTO_ENCODING",
               "DEFAULT_COMPRESSION", "0"),
-             ("c", "string", "", "false", "true", "", "AUTO_ENCODING",
+             ("c", "string", "", "false", "", "true", "", "AUTO_ENCODING",
               "DEFAULT_COMPRESSION", "0")]
 
   @SkipIfKudu.hms_integration_enabled
@@ -1302,7 +1302,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
           impala_table_name, props))
       cursor.execute("DESCRIBE %s" % (impala_table_name))
       assert cursor.fetchall() == \
-          [("a", "int", "", "true", "false", "", "AUTO_ENCODING",
+          [("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
             "DEFAULT_COMPRESSION", "0")]
       # Drop the underlying Kudu table
       kudu_client.delete_table(kudu_table.name)