You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/22 05:33:32 UTC

[05/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index a0a0122..def0be2 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -145,6 +145,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
     keywordMap.put("iregexp", new Integer(SqlParserSymbols.KW_IREGEXP));
     keywordMap.put("is", new Integer(SqlParserSymbols.KW_IS));
     keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
+    keywordMap.put("kudu", new Integer(SqlParserSymbols.KW_KUDU));
     keywordMap.put("last", new Integer(SqlParserSymbols.KW_LAST));
     keywordMap.put("left", new Integer(SqlParserSymbols.KW_LEFT));
     keywordMap.put("like", new Integer(SqlParserSymbols.KW_LIKE));
@@ -173,6 +174,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
     keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS));
     keywordMap.put("preceding", new Integer(SqlParserSymbols.KW_PRECEDING));
     keywordMap.put("prepare_fn", new Integer(SqlParserSymbols.KW_PREPARE_FN));
+    keywordMap.put("primary", new Integer(SqlParserSymbols.KW_PRIMARY));
     keywordMap.put("produced", new Integer(SqlParserSymbols.KW_PRODUCED));
     keywordMap.put("purge", new Integer(SqlParserSymbols.KW_PURGE));
     keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index c47135e..e79c5ab 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -25,20 +25,15 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.junit.Assert;
-import org.junit.Test;
+import junit.framework.Assert;
 
+import org.apache.impala.analysis.CreateTableStmt;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.DataSourceTable;
+import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
@@ -50,10 +45,21 @@ import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.util.MetaStoreUtil;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.junit.Test;
+
 public class AnalyzeDDLTest extends FrontendTestBase {
 
   @Test
@@ -1233,6 +1239,11 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table if not exists functional.zipcode_incomes like parquet "
         + "'/test-warehouse/schemas/malformed_decimal_tiny.parquet'",
         "Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field c1");
+
+    // Invalid file format
+    AnalysisError("create table newtbl_kudu like parquet " +
+        "'/test-warehouse/schemas/alltypestiny.parquet' stored as kudu",
+        "CREATE TABLE LIKE FILE statement is not supported for Kudu tables.");
   }
 
   @Test
@@ -1278,11 +1289,11 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // Unsupported file formats
     AnalysisError("create table foo stored as sequencefile as select 1",
-        "CREATE TABLE AS SELECT does not support (SEQUENCEFILE) file format. " +
-         "Supported formats are: (PARQUET, TEXTFILE)");
+        "CREATE TABLE AS SELECT does not support the (SEQUENCEFILE) file format. " +
+         "Supported formats are: (PARQUET, TEXTFILE, KUDU)");
     AnalysisError("create table foo stored as RCFILE as select 1",
-        "CREATE TABLE AS SELECT does not support (RCFILE) file format. " +
-         "Supported formats are: (PARQUET, TEXTFILE)");
+        "CREATE TABLE AS SELECT does not support the (RCFILE) file format. " +
+         "Supported formats are: (PARQUET, TEXTFILE, KUDU)");
 
     // CTAS with a WITH clause and inline view (IMPALA-1100)
     AnalyzesOk("create table test_with as with with_1 as (select 1 as int_col from " +
@@ -1330,6 +1341,17 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table p partitioned by (tinyint_col, int_col) as " +
         "select double_col, int_col, tinyint_col from functional.alltypes",
         "Partition column name mismatch: tinyint_col != int_col");
+
+    // CTAS into managed Kudu tables
+    AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets" +
+        " stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " +
+        "bigint_col, float_col, double_col, date_string_col, string_col " +
+        "from functional.alltypestiny");
+    // CTAS in an external Kudu table
+    AnalysisError("create external table t stored as kudu " +
+        "tblproperties('kudu.table_name'='t') as select id, int_col from " +
+        "functional.alltypestiny", "CREATE TABLE AS SELECT is not supported for " +
+        "external Kudu tables.");
   }
 
   @Test
@@ -1376,6 +1398,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "No FileSystem for scheme: foofs");
     AnalysisError("create table functional.baz like functional.alltypes location '  '",
         "URI path cannot be empty.");
+
+    // CREATE TABLE LIKE is not currently supported for Kudu tables (see IMPALA-4052)
+    AnalysisError("create table kudu_tbl like functional.alltypestiny stored as kudu",
+        "CREATE TABLE LIKE is not supported for Kudu tables");
+    AnalysisError("create table tbl like functional_kudu.dimtbl", "Cloning a Kudu " +
+        "table using CREATE TABLE LIKE is not supported.");
   }
 
   @Test
@@ -1458,12 +1486,18 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     String [] fileFormats =
         {"TEXTFILE", "SEQUENCEFILE", "PARQUET", "PARQUETFILE", "RCFILE"};
     for (String format: fileFormats) {
-      AnalyzesOk(String.format("create table new_table (i int) " +
-          "partitioned by (d decimal) comment 'c' stored as %s", format));
-      // No column definitions.
-      AnalysisError(String.format("create table new_table " +
-          "partitioned by (d decimal) comment 'c' stored as %s", format),
-          "Table requires at least 1 column");
+      for (String create: ImmutableList.of("create table", "create external table")) {
+        AnalyzesOk(String.format("%s new_table (i int) " +
+            "partitioned by (d decimal) comment 'c' stored as %s", create, format));
+        // No column definitions.
+        AnalysisError(String.format("%s new_table " +
+            "partitioned by (d decimal) comment 'c' stored as %s", create, format),
+            "Table requires at least 1 column");
+      }
+      AnalysisError(String.format("create table t (i int primary key) stored as %s",
+          format), "Only Kudu tables can specify a PRIMARY KEY");
+      AnalysisError(String.format("create table t (i int, primary key(i)) stored as %s",
+          format), "Only Kudu tables can specify a PRIMARY KEY");
     }
 
     // Note: Backslashes need to be escaped twice - once for Java and once for Impala.
@@ -1541,7 +1575,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table cached_tbl(i int) location " +
         "'file:///test-warehouse/cache_tbl' cached in 'testPool'",
         "Location 'file:/test-warehouse/cache_tbl' cannot be cached. " +
-        "Please retry without caching: CREATE TABLE default.cached_tbl ... UNCACHED");
+        "Please retry without caching: CREATE TABLE ... UNCACHED");
 
     // Invalid database name.
     AnalysisError("create table `???`.new_table (x int) PARTITIONED BY (y int)",
@@ -1668,175 +1702,179 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   }
 
   @Test
-  public void TestCreateKuduTable() {
+  public void TestCreateManagedKuduTable() {
     TestUtils.assumeKuduIsSupported();
-    // Create Kudu Table with all required properties
-    AnalyzesOk("create table tab (x int) " +
-        "distribute by hash into 2 buckets tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")");
-
-    // Check that all properties are present
-    AnalysisError("create table tab (x int) " +
-        "distribute by hash into 2 buckets tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")",
-        "Kudu table is missing parameters in table properties. Please verify " +
-        "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
-        "present and have valid values.");
-
-    AnalysisError("create table tab (x int) " +
-            "distribute by hash into 2 buckets tblproperties (" +
-            "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-            "'kudu.table_name'='tab'," +
-            "'kudu.key_columns' = 'a,b,c'"
-            + ")",
-        "Kudu table is missing parameters in table properties. Please verify " +
-            "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
-            "present and have valid values.");
-
-    AnalysisError("create table tab (x int) " +
-        "distribute by hash into 2 buckets tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080'" +
-        ")",
-        "Kudu table is missing parameters in table properties. Please verify " +
-        "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
-        "present and have valid values.");
-
-    // Check that properties are not empty
-    AnalysisError("create table tab (x int) " +
-            "distribute by hash into 2 buckets tblproperties (" +
-            "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-            "'kudu.table_name'=''," +
-            "'kudu.master_addresses' = '127.0.0.1:8080', " +
-            "'kudu.key_columns' = 'a,b,c'" +
-            ")",
-        "Kudu table is missing parameters in table properties. Please verify " +
-            "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
-            "present and have valid values.");
-
-    AnalysisError("create table tab (x int) " +
-            "distribute by hash into 2 buckets tblproperties (" +
-            "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-            "'kudu.table_name'='asd'," +
-            "'kudu.master_addresses' = '', " +
-            "'kudu.key_columns' = 'a,b,c'" +
-            ")",
-        "Kudu table is missing parameters in table properties. Please verify " +
-            "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
-            "present and have valid values.");
-
-    // Don't allow caching
-    AnalysisError("create table tab (x int) cached in 'testPool' " +
-        "distribute by hash into 2 buckets tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")", "A Kudu table cannot be cached in HDFS.");
-
+    // Test primary keys and distribute by clauses
+    AnalyzesOk("create table tab (x int primary key) distribute by hash(x) " +
+        "into 8 buckets stored as kudu");
+    AnalyzesOk("create table tab (x int, primary key(x)) distribute by hash(x) " +
+        "into 8 buckets stored as kudu");
+    AnalyzesOk("create table tab (x int, y int, primary key (x, y)) " +
+        "distribute by hash(x, y) into 8 buckets stored as kudu");
+    AnalyzesOk("create table tab (x int, y int, primary key (x)) " +
+        "distribute by hash(x) into 8 buckets stored as kudu");
+    AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " +
+        "distribute by hash(y) into 8 buckets stored as kudu");
+    // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each
+    // bucket is partitioned into 4 tablets based on the split points of 'y'.
+    AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " +
+        "distribute by hash(x) into 3 buckets, range(y) split rows " +
+        "(('aa'), ('bb'), ('cc')) stored as kudu");
+    // Key column in upper case
+    AnalyzesOk("create table tab (x int, y int, primary key (X)) " +
+        "distribute by hash (x) into 8 buckets stored as kudu");
     // Flexible Partitioning
-    AnalyzesOk("create table tab (a int, b int, c int, d int) " +
-        "distribute by hash(a,b) into 8 buckets, hash(c) into 2 buckets " +
-        "tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")");
-
-    AnalyzesOk("create table tab (a int, b int, c int, d int) " +
-        "distribute by hash into 8 buckets " +
-        "tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")");
-
-    // DISTRIBUTE BY is required for managed tables.
-    AnalysisError("create table tab (a int) tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a'" +
-        ")",
-        "A data distribution must be specified using the DISTRIBUTE BY clause.");
-
-    // DISTRIBUTE BY is not allowed for external tables.
-    AnalysisError("create external table tab (a int) " +
-        "distribute by hash into 3 buckets tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a'" +
-        ")",
-        "The DISTRIBUTE BY clause may not be specified for external tables.");
-
-    // Number of buckets must be larger 1
-    AnalysisError("create table tab (a int, b int, c int, d int) " +
-        "distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets " +
-        "tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")",
-        "Number of buckets in DISTRIBUTE BY clause 'HASH(c) INTO 1 BUCKETS' must " +
-            "be larger than 1");
-
-    // Key ranges must match the column types.
-    // TODO(kudu-merge) uncomment this when IMPALA-3156 is addressed.
-    //AnalysisError("create table tab (a int, b int, c int, d int) " +
-    //    "distribute by hash(a,b,c) into 8 buckets, " +
-    //    "range(a) split rows ((1),('abc'),(3)) " +
-    //    "tblproperties (" +
-    //    "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-    //    "'kudu.table_name'='tab'," +
-    //    "'kudu.master_addresses' = '127.0.0.1:8080', " +
-    //    "'kudu.key_columns' = 'a,b,c')");
-
-    // Distribute range data types are picked up during analysis and forwarded to Kudu
-    AnalyzesOk("create table tab (a int, b int, c int, d int) " +
-        "distribute by hash(a,b,c) into 8 buckets, " +
-        "range(a) split rows ((1),(2),(3)) " +
-        "tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")");
-
+    AnalyzesOk("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
+        "distribute by hash (a, b) into 8 buckets, hash(c) into 2 buckets stored as " +
+        "kudu");
+    // No columns specified in the DISTRIBUTE BY HASH clause
+    AnalyzesOk("create table tab (a int primary key, b int, c int, d int) " +
+        "distribute by hash into 8 buckets stored as kudu");
+    // Distribute range data types are picked up during analysis and forwarded to Kudu.
+    // Column names in distribute params should also be case-insensitive.
+    AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" +
+        "distribute by hash (a, B, c) into 8 buckets, " +
+        "range (A) split rows ((1),(2),(3)) stored as kudu");
+    // Allowing range distribution on a subset of the primary keys
+    AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " +
+        "primary key (id, name)) distribute by range (name) split rows (('abc')) " +
+        "stored as kudu");
+    // Null values in SPLIT ROWS
+    AnalysisError("create table tab (id int, name string, primary key(id, name)) " +
+        "distribute by hash (id) into 3 buckets, range (name) split rows ((null),(1)) " +
+        "stored as kudu", "Split values cannot be NULL. Split row: (NULL)");
+    // Primary key specified in tblproperties
+    AnalysisError(String.format("create table tab (x int) distribute by hash (x) " +
+        "into 8 buckets stored as kudu tblproperties ('%s' = 'x')",
+        KuduTable.KEY_KEY_COLUMNS), "PRIMARY KEY must be used instead of the table " +
+        "property");
+    // Primary key column that doesn't exist
+    AnalysisError("create table tab (x int, y int, primary key (z)) " +
+        "distribute by hash (x) into 8 buckets stored as kudu",
+        "PRIMARY KEY column 'z' does not exist in the table");
+    // Invalid composite primary key
+    AnalysisError("create table tab (x int primary key, primary key(x)) stored " +
+        "as kudu", "Multiple primary keys specified. Composite primary keys can " +
+        "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
+        "of the column definition.");
+    AnalysisError("create table tab (x int primary key, y int primary key) stored " +
+        "as kudu", "Multiple primary keys specified. Composite primary keys can " +
+        "be specified using the PRIMARY KEY (col1, col2, ...) syntax at the end " +
+        "of the column definition.");
+    // Specifying the same primary key column multiple times
+    AnalysisError("create table tab (x int, primary key (x, x)) distribute by hash (x) " +
+        "into 8 buckets stored as kudu",
+        "Column 'x' is listed multiple times as a PRIMARY KEY.");
     // Each split row size should equals to the number of range columns.
-    AnalysisError("create table tab (a int, b int, c int, d int) " +
-        "distribute by range(a) split rows ((1,'extra_val'),(2),(3)) " +
-        "tblproperties (" +
-        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-        "'kudu.table_name'='tab'," +
-        "'kudu.master_addresses' = '127.0.0.1:8080', " +
-        "'kudu.key_columns' = 'a,b,c'" +
-        ")",
+    AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
+        "distribute by range(a) split rows ((1,'extra_val'),(2),(3)) stored as kudu",
         "SPLIT ROWS has different size than number of projected key columns: 1. " +
         "Split row: (1, 'extra_val')");
-
+    // Key ranges must match the column types.
+    AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " +
+        "distribute by hash (a, b, c) into 8 buckets, " +
+        "range (a) split rows ((1), ('abc'), (3)) stored as kudu",
+        "Split value 'abc' (type: STRING) is not type compatible with column 'a'" +
+        " (type: INT).");
+    // Non-key column used in DISTRIBUTE BY
+    AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " +
+        "distribute by range (b) split rows (('abc')) stored as kudu",
+        "Column 'b' in 'RANGE (b) SPLIT ROWS (('abc'))' is not a key column. " +
+        "Only key columns can be used in DISTRIBUTE BY.");
     // No float split keys
-    AnalysisError("create table tab (a int, b int, c int, d int) " +
-            "distribute by hash(a,b,c) into 8 buckets, " +
-            "range(a) split rows ((1.2),('abc'),(3)) " +
-            "tblproperties (" +
-            "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
-            "'kudu.table_name'='tab'," +
-            "'kudu.master_addresses' = '127.0.0.1:8080', " +
-            "'kudu.key_columns' = 'a,b,c'" +
-            ")",
-        "Only integral and string values allowed for split rows.");
+    AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" +
+        "distribute by hash (a, b, c) into 8 buckets, " +
+        "range (a) split rows ((1.2), ('abc'), (3)) stored as kudu",
+        "Split value 1.2 (type: DECIMAL(2,1)) is not type compatible with column 'a' " +
+        "(type: INT).");
+    // Non-existing column used in DISTRIBUTE BY
+    AnalysisError("create table tab (a int, b int, primary key (a, b)) " +
+        "distribute by range(unknown_column) split rows (('abc')) stored as kudu",
+        "Column 'unknown_column' in 'RANGE (unknown_column) SPLIT ROWS (('abc'))' " +
+        "is not a key column. Only key columns can be used in DISTRIBUTE BY");
+    // Kudu table name is specified in tblproperties
+    AnalyzesOk("create table tab (x int primary key) distribute by hash (x) " +
+        "into 8 buckets stored as kudu tblproperties ('kudu.table_name'='tab_1'," +
+        "'kudu.num_tablet_replicas'='1'," +
+        "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')");
+    // No port is specified in kudu master address
+    AnalyzesOk("create table tdata_no_port (id int primary key, name string, " +
+        "valf float, vali bigint) DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) " +
+        "STORED AS KUDU tblproperties('kudu.master_addresses'='127.0.0.1')");
+    // Not using the STORED AS KUDU syntax to specify a Kudu table
+    AnalysisError("create table tab (x int primary key) tblproperties (" +
+        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')",
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    AnalysisError("create table tab (x int primary key) stored as kudu tblproperties (" +
+        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')",
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    // Invalid value for number of replicas
+    AnalysisError("create table t (x int primary key) stored as kudu tblproperties (" +
+        "'kudu.num_tablet_replicas'='1.1')",
+        "Table property 'kudu.num_tablet_replicas' must be an integer.");
+    // Don't allow caching
+    AnalysisError("create table tab (x int primary key) stored as kudu cached in " +
+        "'testPool'", "A Kudu table cannot be cached in HDFS.");
+    // LOCATION cannot be used with Kudu tables
+    AnalysisError("create table tab (a int primary key) distribute by hash (a) " +
+        "into 3 buckets stored as kudu location '/test-warehouse/'",
+        "LOCATION cannot be specified for a Kudu table.");
+    // DISTRIBUTE BY is required for managed tables.
+    AnalysisError("create table tab (a int, primary key (a)) stored as kudu",
+        "Table distribution must be specified for managed Kudu tables.");
+    AnalysisError("create table tab (a int) stored as kudu",
+        "A primary key is required for a Kudu table.");
+    // Using ROW FORMAT with a Kudu table
+    AnalysisError("create table tab (x int primary key) " +
+        "row format delimited escaped by 'X' stored as kudu",
+        "ROW FORMAT cannot be specified for file format KUDU.");
+    // Using PARTITIONED BY with a Kudu table
+    AnalysisError("create table tab (x int primary key) " +
+        "partitioned by (y int) stored as kudu", "PARTITIONED BY cannot be used " +
+        "in Kudu tables.");
+  }
+
+  @Test
+  public void TestCreateExternalKuduTable() {
+    AnalyzesOk("create external table t stored as kudu " +
+        "tblproperties('kudu.table_name'='t')");
+    // Use all allowed optional table props.
+    AnalyzesOk("create external table t stored as kudu tblproperties (" +
+        "'kudu.table_name'='tab'," +
+        "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')");
+    // Kudu table should be specified using the STORED AS KUDU syntax.
+    AnalysisError("create external table t tblproperties (" +
+        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler'," +
+        "'kudu.table_name'='t')",
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    // Columns should not be specified in an external Kudu table
+    AnalysisError("create external table t (x int) stored as kudu " +
+        "tblproperties('kudu.table_name'='t')",
+        "Columns cannot be specified with an external Kudu table.");
+    // Primary keys cannot be specified in an external Kudu table
+    AnalysisError("create external table t (x int primary key) stored as kudu " +
+        "tblproperties('kudu.table_name'='t')", "Primary keys cannot be specified " +
+        "for an external Kudu table");
+    // Invalid syntax for specifying a Kudu table
+    AnalysisError("create external table t (x int) stored as parquet tblproperties (" +
+        "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler'," +
+        "'kudu.table_name'='t')", CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    AnalysisError("create external table t stored as kudu tblproperties (" +
+        "'storage_handler'='foo', 'kudu.table_name'='t')",
+        CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
+    // Cannot specify the number of replicas for external Kudu tables
+    AnalysisError("create external table tab (x int) stored as kudu " +
+        "tblproperties ('kudu.num_tablet_replicas' = '1', " +
+        "'kudu.table_name'='tab')",
+        "Table property 'kudu.num_tablet_replicas' cannot be used with an external " +
+        "Kudu table.");
+    // Don't allow caching
+    AnalysisError("create external table t stored as kudu cached in 'testPool' " +
+        "tblproperties('kudu.table_name'='t')", "A Kudu table cannot be cached in HDFS.");
+    // LOCATION cannot be used for a Kudu table
+    AnalysisError("create external table t stored as kudu " +
+        "location '/test-warehouse' tblproperties('kudu.table_name'='t')",
+        "LOCATION cannot be specified for a Kudu table.");
   }
 
   @Test
@@ -2036,6 +2074,13 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("create table functional.new_table (i int) " +
         "partitioned by (x struct<f1:int>)",
         "Type 'STRUCT<f1:INT>' is not supported as partition-column type in column: x");
+
+    // Kudu specific clauses used in an Avro table.
+    AnalysisError("create table functional.new_table (i int primary key) " +
+        "distribute by hash(i) into 3 buckets stored as avro",
+        "Only Kudu tables can use the DISTRIBUTE BY clause.");
+    AnalysisError("create table functional.new_table (i int primary key) " +
+        "stored as avro", "Only Kudu tables can specify a PRIMARY KEY.");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 8b8ea42..6308293 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -2225,8 +2225,21 @@ public class ParserTest {
       // No column definitions.
       ParsesOk(String.format(
           "CREATE EXTERNAL TABLE Foo COMMENT 'c' STORED AS %s LOCATION '/b'", format));
+      ParserError(String.format("CREATE EXTERNAL TABLE t PRIMARY KEYS (i) STORED AS " +
+          "%s", format));
     }
 
+    ParsesOk("CREATE TABLE foo (i INT) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT PRIMARY KEY) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT, j INT, PRIMARY KEY (i, j)) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT, j INT, PRIMARY KEY (j, i)) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT PRIMARY KEY, PRIMARY KEY(i)) STORED AS KUDU");
+    ParsesOk("CREATE TABLE foo (i INT PRIMARY KEY, j INT PRIMARY KEY) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (i INT) PRIMARY KEY (i) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (i INT, PRIMARY KEY) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (PRIMARY KEY(a), a INT) STORED AS KUDU");
+    ParserError("CREATE TABLE foo (i INT) PRIMARY KEY (i) STORED AS KUDU");
+
     // Table Properties
     String[] tblPropTypes = {"TBLPROPERTIES", "WITH SERDEPROPERTIES"};
     for (String propType: tblPropTypes) {
@@ -2383,6 +2396,7 @@ public class ParserTest {
     ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " +
         "SPLIT ROWS ()");
     ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i)");
+    ParserError("CREATE EXTERNAL TABLE Foo DISTRIBUTE BY HASH INTO 4 BUCKETS");
 
     // Combine both
     ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY HASH(i) INTO 4 BUCKETS, RANGE(i) " +
@@ -2391,6 +2405,9 @@ public class ParserTest {
     // Can only have one range clause
     ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY HASH(i) INTO 4 BUCKETS, RANGE(i) " +
         "SPLIT ROWS ((1, 2.0, 'asdas')), RANGE(i) SPLIT ROWS ((1, 2.0, 'asdas'))");
+    // Range needs to be the last DISTRIBUTE BY clause
+    ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) SPLIT ROWS ((1),(2)), " +
+        "HASH (i) INTO 3 BUCKETS");
   }
 
   @Test
@@ -2527,6 +2544,15 @@ public class ParserTest {
     ParsesOk("CREATE TABLE Foo STORED AS PARQUET AS SELECT 1");
     ParsesOk("CREATE TABLE Foo ROW FORMAT DELIMITED STORED AS PARQUETFILE AS SELECT 1");
     ParsesOk("CREATE TABLE Foo TBLPROPERTIES ('a'='b', 'c'='d') AS SELECT * from bar");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) AS SELECT * from bar");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY HASH INTO 2 BUCKETS " +
+        "AS SELECT * from bar");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY HASH (b) INTO 2 " +
+        "BUCKETS AS SELECT * from bar");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY RANGE (b) SPLIT ROWS " +
+        "(('foo'), ('bar')) STORED AS KUDU AS SELECT * from bar");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY RANGE SPLIT ROWS " +
+        "(('foo'), ('bar')) STORED AS KUDU AS SELECT * from bar");
 
     // With clause works
     ParsesOk("CREATE TABLE Foo AS with t1 as (select 1) select * from t1");
@@ -2554,8 +2580,10 @@ public class ParserTest {
     ParserError("CREATE TABLE Foo PARTITIONED BY (a, b=2) AS SELECT * from Bar");
 
     // Flexible partitioning
-    ParsesOk("CREATE TABLE Foo DISTRIBUTE BY HASH(i) INTO 4 BUCKETS AS SELECT 1");
-    ParsesOk("CREATE TABLE Foo DISTRIBUTE BY HASH(a) INTO 4 BUCKETS " +
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (i) DISTRIBUTE BY HASH(i) INTO 4 BUCKETS AS " +
+        "SELECT 1");
+    ParserError("CREATE TABLE Foo DISTRIBUTE BY HASH(i) INTO 4 BUCKETS AS SELECT 1");
+    ParsesOk("CREATE TABLE Foo PRIMARY KEY (a) DISTRIBUTE BY HASH(a) INTO 4 BUCKETS " +
         "TBLPROPERTIES ('a'='b', 'c'='d') AS SELECT * from bar");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/test/java/org/apache/impala/service/JdbcTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index a1f4a29..6309e8e 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -49,7 +49,7 @@ import com.google.common.collect.Lists;
  * JdbcTest
  *
  * Basic JDBC metadata test. It exercises getTables, getCatalogs, getSchemas,
- * getTableTypes, getColumns.
+ * getTableTypes, getColumnNames.
  *
  */
 public class JdbcTest {
@@ -204,7 +204,7 @@ public class JdbcTest {
     ResultSet rs = con_.getMetaData().getColumns(null,
         "functional", "alltypessmall", "s%rin%");
 
-    // validate the metadata for the getColumns result set
+    // validate the metadata for the getColumnNames result set
     ResultSetMetaData rsmd = rs.getMetaData();
     assertEquals("TABLE_CAT", rsmd.getColumnName(1));
     assertTrue(rs.next());
@@ -477,7 +477,7 @@ public class JdbcTest {
   }
 
   /**
-   * Validate the Metadata for the result set of a metadata getColumns call.
+   * Validate the Metadata for the result set of a metadata getColumnNames call.
    */
   @Test
   public void testMetaDataGetColumnsMetaData() throws SQLException {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 1dfed8a..b3167ce 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -43,7 +43,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
    * Takes an AuthorizationConfig to bootstrap the backing CatalogServiceCatalog.
    */
   public ImpaladTestCatalog(AuthorizationConfig authzConfig) {
-    super();
+    super("127.0.0.1");
     CatalogServiceCatalog catalogServerCatalog =
         CatalogServiceTestCatalog.createWithAuth(authzConfig.getSentryConfig());
     // Bootstrap the catalog by adding all dbs, tables, and functions.
@@ -51,7 +51,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
       // Adding DB should include all tables/fns in that database.
       addDb(db);
     }
-    authPolicy_ = ((CatalogServiceTestCatalog) catalogServerCatalog).getAuthPolicy();
+    authPolicy_ = catalogServerCatalog.getAuthPolicy();
     srcCatalog_ = catalogServerCatalog;
     setIsReady(true);
   }
@@ -88,4 +88,4 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
     db.addTable(newTbl);
     return super.getTable(dbName, tableName);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/infra/python/deps/download_requirements
----------------------------------------------------------------------
diff --git a/infra/python/deps/download_requirements b/infra/python/deps/download_requirements
index daa5025..d586104 100755
--- a/infra/python/deps/download_requirements
+++ b/infra/python/deps/download_requirements
@@ -29,5 +29,5 @@ PY26="$(./find_py26.py)"
 "$PY26" pip_download.py virtualenv 13.1.0
 # kudu-python is downloaded separately because pip install attempts to execute a
 # setup.py subcommand for kudu-python that can fail even if the download succeeds.
-"$PY26" pip_download.py kudu-python 0.1.1
+"$PY26" pip_download.py kudu-python 0.2.0
 popd

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index b433713..2ba0853 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -82,13 +82,9 @@ texttable == 0.8.3
 # functional and determines the expected kudu-python version. The version must be listed
 # in the format below including # and spacing. Keep this formatting! The kudu-python
 # version in download_requirements must be kept in sync with this version.
-# kudu-python==0.1.1
+# kudu-python==0.2.0
   Cython == 0.23.4
   numpy == 1.10.4
-  # These should eventually be removed  https://issues.apache.org/jira/browse/KUDU-1456
-  unittest2 == 1.1.0
-    linecache2 == 1.0.0
-    traceback2 == 1.4.0
 
 # For dev purposes, not used in scripting. Version 1.2.1 is the latest that supports 2.6.
 ipython == 1.2.1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index ca3441b..4c8ef02 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -134,7 +134,7 @@ FILE_FORMAT_MAP = {
     "\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'",
   'avro': 'AVRO',
   'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
-  'kudu': "'com.cloudera.kudu.hive.KuduStorageHandler'",
+  'kudu': "KUDU",
   }
 
 HIVE_TO_AVRO_TYPE_MAP = {
@@ -193,17 +193,17 @@ def build_table_template(file_format, columns, partition_columns, row_format,
   if file_format == 'hbase':
     return build_hbase_create_stmt_in_hive(columns, partition_columns, table_name)
 
+  primary_keys_clause = ""
+
   partitioned_by = str()
   if partition_columns:
     partitioned_by = 'PARTITIONED BY (%s)' % ', '.join(partition_columns.split('\n'))
 
   row_format_stmt = str()
-  if row_format:
+  if row_format and file_format != 'kudu':
     row_format_stmt = 'ROW FORMAT ' + row_format
 
-  file_format_string = str()
-  if file_format != 'kudu':
-    file_format_string = "STORED AS {file_format}"
+  file_format_string = "STORED AS {file_format}"
 
   tblproperties_clause = "TBLPROPERTIES (\n{0}\n)"
   tblproperties = {}
@@ -218,7 +218,7 @@ def build_table_template(file_format, columns, partition_columns, row_format,
     else:
       tblproperties["avro.schema.url"] = "hdfs://%s/%s/%s/{table_name}.json" \
         % (options.hdfs_namenode, options.hive_warehouse_dir, avro_schema_dir)
-  elif file_format == 'parquet':
+  elif file_format in 'parquet':
     row_format_stmt = str()
   elif file_format == 'kudu':
     # Use partitioned_by to set a trivial hash distribution
@@ -229,11 +229,9 @@ def build_table_template(file_format, columns, partition_columns, row_format,
     kudu_master = os.getenv("KUDU_MASTER_ADDRESS", "127.0.0.1")
     kudu_master_port = os.getenv("KUDU_MASTER_PORT", "7051")
     row_format_stmt = str()
-    tblproperties["storage_handler"] = "com.cloudera.kudu.hive.KuduStorageHandler"
     tblproperties["kudu.master_addresses"] = \
       "{0}:{1}".format(kudu_master, kudu_master_port)
-    tblproperties["kudu.table_name"] = table_name
-    tblproperties["kudu.key_columns"] = columns.split("\n")[0].split(" ")[0]
+    primary_keys_clause = ", PRIMARY KEY (%s)" % columns.split("\n")[0].split(" ")[0]
     # Kudu's test tables are managed.
     external = ""
 
@@ -261,7 +259,8 @@ def build_table_template(file_format, columns, partition_columns, row_format,
   # (e.g. Avro)
   stmt = """
 CREATE {external} TABLE IF NOT EXISTS {{db_name}}{{db_suffix}}.{{table_name}} (
-{columns})
+{columns}
+{primary_keys})
 {partitioned_by}
 {row_format}
 {file_format_string}
@@ -271,6 +270,7 @@ LOCATION '{{hdfs_location}}'
     external=external,
     row_format=row_format_stmt,
     columns=',\n'.join(columns.split('\n')),
+    primary_keys=primary_keys_clause,
     partitioned_by=partitioned_by,
     tblproperties=tblproperties_clause,
     file_format_string=file_format_string

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index aeeba9b..76e1427 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -78,7 +78,7 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/101101.txt' OVERW
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/101201.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=12);
 ---- CREATE_KUDU
 CREATE TABLE {db_name}{db_suffix}.{table_name} (
-  id INT,
+  id INT PRIMARY KEY,
   bool_col BOOLEAN,
   tinyint_col TINYINT,
   smallint_col SMALLINT,
@@ -92,13 +92,7 @@ CREATE TABLE {db_name}{db_suffix}.{table_name} (
   year INT,
   month INT
 )
-DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'id'
-);
+DISTRIBUTE BY HASH (id) INTO 3 BUCKETS STORED AS KUDU;
 ---- DEPENDENT_LOAD_KUDU
 INSERT into TABLE {db_name}{db_suffix}.{table_name}
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
@@ -161,7 +155,7 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090301.txt'
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090401.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=4);
 ---- CREATE_KUDU
 CREATE TABLE {db_name}{db_suffix}.{table_name} (
-  id INT,
+  id INT PRIMARY KEY,
   bool_col BOOLEAN,
   tinyint_col TINYINT,
   smallint_col SMALLINT,
@@ -175,13 +169,7 @@ CREATE TABLE {db_name}{db_suffix}.{table_name} (
   year INT,
   month INT
 )
-DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'id'
-);
+DISTRIBUTE BY HASH (id) INTO 3 BUCKETS STORED AS KUDU;
 ---- DEPENDENT_LOAD_KUDU
 INSERT into TABLE {db_name}{db_suffix}.{table_name}
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
@@ -225,7 +213,7 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090301.txt' O
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090401.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=4);
 ---- CREATE_KUDU
 CREATE TABLE {db_name}{db_suffix}.{table_name} (
-  id INT,
+  id INT PRIMARY KEY,
   bool_col BOOLEAN,
   tinyint_col TINYINT,
   smallint_col SMALLINT,
@@ -239,13 +227,7 @@ CREATE TABLE {db_name}{db_suffix}.{table_name} (
   year INT,
   month INT
 )
-DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'id'
-);
+DISTRIBUTE BY HASH (id) INTO 3 BUCKETS STORED AS KUDU;
 ---- DEPENDENT_LOAD_KUDU
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name}
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
@@ -565,7 +547,7 @@ DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
 DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx;
 
 CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
-  kudu_idx BIGINT,
+  kudu_idx BIGINT PRIMARY KEY,
   id INT,
   bool_col BOOLEAN,
   tinyint_col TINYINT,
@@ -581,14 +563,7 @@ CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
   month INT,
   day INT
 )
-DISTRIBUTE BY HASH (kudu_idx) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'kudu_idx'
-);
-
+DISTRIBUTE BY HASH (kudu_idx) INTO 3 BUCKETS STORED AS KUDU;
 CREATE VIEW {db_name}{db_suffix}.{table_name} AS
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
        double_col, date_string_col, string_col, timestamp_col, year, month, day
@@ -651,7 +626,7 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100109.
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100110.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=10);
 ---- CREATE_KUDU
 CREATE TABLE {db_name}{db_suffix}.{table_name} (
-  id INT,
+  id INT PRIMARY KEY,
   bool_col BOOLEAN,
   tinyint_col TINYINT,
   smallint_col SMALLINT,
@@ -666,13 +641,7 @@ CREATE TABLE {db_name}{db_suffix}.{table_name} (
   month INT,
   day INT
 )
-DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'id'
-);
+DISTRIBUTE BY HASH (id) INTO 3 BUCKETS STORED AS KUDU;
 ---- DEPENDENT_LOAD_KUDU
 INSERT into TABLE {db_name}{db_suffix}.{table_name}
 SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
@@ -788,17 +757,11 @@ zip int
 delimited fields terminated by ','  escaped by '\\'
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  id bigint,
+  id bigint primary key,
   name string,
   zip int
 )
-distribute by range(id) split rows ((1003), (1007))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'id'
-);
+distribute by range(id) split rows ((1003), (1007)) stored as kudu;
 ====
 ---- DATASET
 functional
@@ -816,17 +779,11 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/DimTbl/data.csv' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  id bigint,
+  id bigint primary key,
   name string,
   zip int
 )
-distribute by range(id) split rows ((1003), (1007))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'id'
-);
+distribute by range(id) split rows ((1003), (1007)) stored as kudu;
 ====
 ---- DATASET
 functional
@@ -848,15 +805,10 @@ create table {db_name}{db_suffix}.{table_name} (
   test_id bigint,
   test_name string,
   test_zip int,
-  alltypes_id int
+  alltypes_id int,
+  primary key (test_id, test_name, test_zip, alltypes_id)
 )
-distribute by range(test_id) split rows ((1003), (1007))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'test_id, test_name, test_zip, alltypes_id'
-);
+distribute by range(test_id) split rows ((1003), (1007)) stored as kudu;
 ====
 ---- DATASET
 functional
@@ -1191,16 +1143,10 @@ f2 int
 field string
 ---- CREATE_KUDU
 CREATE TABLE {db_name}{db_suffix}.{table_name} (
-  field STRING,
+  field STRING PRIMARY KEY,
   f2 INT
 )
-DISTRIBUTE BY HASH (field) INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = '{table_name}',
-'kudu.master_addresses' = '127.0.0.1:7051',
-'kudu.key_columns' = 'field'
-);
+DISTRIBUTE BY HASH (field) INTO 3 BUCKETS STORED AS KUDU;
 ====
 ---- DATASET
 functional
@@ -1303,16 +1249,10 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/TinyTable/data.csv' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  a string,
+  a string primary key,
   b string
 )
-distribute by range(a) split rows (('b'), ('d'))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'a'
-);
+distribute by range(a) split rows (('b'), ('d')) stored as kudu;
 ====
 ---- DATASET
 functional
@@ -1328,15 +1268,9 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/TinyIntTable/data.csv' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  int_col int
+  int_col int primary key
 )
-distribute by range(int_col) split rows ((2), (4), (6), (8))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'int_col'
-);
+distribute by range(int_col) split rows ((2), (4), (6), (8)) stored as kudu;
 ====
 ---- DATASET
 functional
@@ -1359,15 +1293,9 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/NullTable/data.csv'
 OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  a string, b string, c string, d int, e double, f string, g string
+  a string primary key, b string, c string, d int, e double, f string, g string
 )
-distribute by hash(a) into 3 buckets
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'a'
-);
+distribute by hash(a) into 3 buckets stored as kudu;
 ====
 ---- DATASET
 functional
@@ -1390,15 +1318,9 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/NullTable/data.csv'
 OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ---- CREATE_KUDU
 create table {db_name}{db_suffix}.{table_name} (
-  a string, b string, c string, d int, e double, f string, g string
+  a string primary key, b string, c string, d int, e double, f string, g string
 )
-distribute by hash(a) into 3 buckets
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'a'
-);
+distribute by hash(a) into 3 buckets stored as kudu;
 ====
 ---- DATASET
 functional
@@ -1474,15 +1396,10 @@ create table {db_name}{db_suffix}.{table_name} (
   zip string,
   description1 string,
   description2 string,
-  income int
-)
+  income int,
+  primary key (id, zip))
 distribute by range(id, zip) split rows (('8600000US01475', '01475'), ('8600000US63121', '63121'), ('8600000US84712', '84712'))
-tblproperties (
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'id, zip'
-);
+stored as kudu;
 ====
 ---- DATASET
 functional

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/datasets/tpch/tpch_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/tpch/tpch_schema_template.sql b/testdata/datasets/tpch/tpch_schema_template.sql
index dc2af15..0512f6a 100644
--- a/testdata/datasets/tpch/tpch_schema_template.sql
+++ b/testdata/datasets/tpch/tpch_schema_template.sql
@@ -57,15 +57,10 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   L_RECEIPTDATE STRING,
   L_SHIPINSTRUCT STRING,
   L_SHIPMODE STRING,
-  L_COMMENT STRING
+  L_COMMENT STRING,
+  PRIMARY KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
 )
-distribute by hash (l_orderkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'l_orderkey, l_partkey, l_suppkey, l_linenumber'
-);
+distribute by hash (l_orderkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -90,7 +85,7 @@ P_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  P_PARTKEY BIGINT,
+  P_PARTKEY BIGINT PRIMARY KEY,
   P_NAME STRING,
   P_MFGR STRING,
   P_BRAND STRING,
@@ -100,13 +95,7 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   P_RETAILPRICE DOUBLE,
   P_COMMENT STRING
 )
-distribute by hash (p_partkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'p_partkey'
-);
+distribute by hash (p_partkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -131,15 +120,10 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   PS_SUPPKEY BIGINT,
   PS_AVAILQTY BIGINT,
   PS_SUPPLYCOST DOUBLE,
-  PS_COMMENT STRING
+  PS_COMMENT STRING,
+  PRIMARY KEY(PS_PARTKEY, PS_SUPPKEY)
 )
-distribute by hash (ps_partkey, ps_suppkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'ps_partkey, ps_suppkey'
-);
+distribute by hash (ps_partkey, ps_suppkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -162,7 +146,7 @@ S_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  S_SUPPKEY BIGINT,
+  S_SUPPKEY BIGINT PRIMARY KEY,
   S_NAME STRING,
   S_ADDRESS STRING,
   S_NATIONKEY SMALLINT,
@@ -170,13 +154,7 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   S_ACCTBAL DOUBLE,
   S_COMMENT STRING
 )
-distribute by hash (s_suppkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 's_suppkey'
-);
+distribute by hash (s_suppkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -196,18 +174,12 @@ N_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  N_NATIONKEY SMALLINT,
+  N_NATIONKEY SMALLINT PRIMARY KEY,
   N_NAME STRING,
   N_REGIONKEY SMALLINT,
   N_COMMENT STRING
 )
-distribute by hash (n_nationkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'n_nationkey'
-);
+distribute by hash (n_nationkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -226,17 +198,11 @@ R_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  R_REGIONKEY SMALLINT,
+  R_REGIONKEY SMALLINT PRIMARY KEY,
   R_NAME STRING,
   R_COMMENT STRING
 )
-distribute by hash (r_regionkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'r_regionkey'
-);
+distribute by hash (r_regionkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -261,7 +227,7 @@ O_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  O_ORDERKEY BIGINT,
+  O_ORDERKEY BIGINT PRIMARY KEY,
   O_CUSTKEY BIGINT,
   O_ORDERSTATUS STRING,
   O_TOTALPRICE DOUBLE,
@@ -271,13 +237,7 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   O_SHIPPRIORITY INT,
   O_COMMENT STRING
 )
-distribute by hash (o_orderkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'o_orderkey'
-);
+distribute by hash (o_orderkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
@@ -301,7 +261,7 @@ C_COMMENT STRING
 DELIMITED FIELDS TERMINATED BY '|'
 ---- CREATE_KUDU
 create table if not exists {db_name}{db_suffix}.{table_name} (
-  C_CUSTKEY BIGINT,
+  C_CUSTKEY BIGINT PRIMARY KEY,
   C_NAME STRING,
   C_ADDRESS STRING,
   C_NATIONKEY SMALLINT,
@@ -310,54 +270,10 @@ create table if not exists {db_name}{db_suffix}.{table_name} (
   C_MKTSEGMENT STRING,
   C_COMMENT STRING
 )
-distribute by hash (c_custkey) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'c_custkey'
-);
+distribute by hash (c_custkey) into 9 buckets stored as kudu;
 ---- DEPENDENT_LOAD
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
 ---- LOAD
 LOAD DATA LOCAL INPATH '{impala_home}/testdata/impala-data/{db_name}/{table_name}'
 OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
 ====
----- DATASET
-tpch
----- BASE_TABLE_NAME
-revenue
----- COLUMNS
-supplier_no bigint
-total_revenue Decimal(38,4)
----- CREATE_KUDU
-create table if not exists {db_name}{db_suffix}.{table_name} (
-  supplier_no bigint,
-  total_revevue double
-)
-distribute by hash (supplier_no) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'supplier_no'
-);
-====
----- DATASET
-tpch
----- BASE_TABLE_NAME
-max_revenue
----- COLUMNS
-max_revenue Decimal(38, 4)
----- CREATE_KUDU
-create table if not exists {db_name}{db_suffix}.{table_name} (
-  max_revenue bigint
-)
-distribute by hash (max_revenue) into 9 buckets
-tblproperties(
-  'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-  'kudu.master_addresses' = '127.0.0.1:7051',
-  'kudu.table_name' = '{table_name}',
-  'kudu.key_columns' = 'max_revenue'
-);
-====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 06ce157..d3022a8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -5,9 +5,9 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.testtbl]
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
-  ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), <end>)}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -23,9 +23,9 @@ PLAN-ROOT SINK
    kudu predicates: name = '10'
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
-  ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), <end>)}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -111,7 +111,7 @@ PLAN-ROOT SINK
    kudu predicates: id <= 20, zip <= 30, id >= 10, zip < 50, zip <= 5, zip > 1, zip >= 0, name = 'foo'
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -131,7 +131,7 @@ PLAN-ROOT SINK
    kudu predicates: id <= 60, id < 40, id < 103
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -152,9 +152,9 @@ PLAN-ROOT SINK
    kudu predicates: name = 'a'
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
-  ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), <end>)}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -175,9 +175,9 @@ PLAN-ROOT SINK
    predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE
 ---- SCANRANGELOCATIONS
 NODE 0:
-  ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
-  ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
-  ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), <end>)}
+  ScanToken{table=impala::functional_kudu.testtbl, range-partition: [<start>, (int64 id=1003))}
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test b/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test
deleted file mode 100644
index 835e273..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test
+++ /dev/null
@@ -1,90 +0,0 @@
-====
----- QUERY
-# Create managed Kudu table
-create table managed_kudu
-( id int, f float, d double, s string, v varchar(10), t tinyint, m smallint )
-distribute by hash into 3 buckets
-tblproperties
-(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'managed_kudu',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'id'
-)
----- RESULTS
-====
----- QUERY
-describe managed_kudu
----- RESULTS
-'id','int',''
-'f','float',''
-'d','double',''
-'s','string',''
-'v','varchar(10)',''
-'t','tinyint',''
-'m','smallint',''
----- TYPES
-STRING,STRING,STRING
-====
----- QUERY
-# Create external kudu table with non-matching schema (name)
-create external table external_kudu
-( id int, f float, do double, s string, v varchar(10), t tinyint, m smallint )
-tblproperties
-(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'managed_kudu',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'id'
-)
----- CATCH
-ImpalaRuntimeException: Table external_kudu (managed_kudu) has a different schema in Kudu than in Hive.
-====
----- QUERY
-# Create external kudu table with non-matching schema (type)
-create external table external_kudu
-( id bigint, f float, d double, s string, v varchar(10), t tinyint, m smallint )
-tblproperties
-(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'managed_kudu',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'id'
-)
----- CATCH
-ImpalaRuntimeException: Table external_kudu (managed_kudu) has a different schema in Kudu than in Hive.
-====
----- QUERY
-# Create external kudu table with matching schema
-create external table external_kudu
-( id int, f float, d double, s string, v varchar(10), t tinyint, m smallint )
-tblproperties
-(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'managed_kudu',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'id'
-)
----- RESULTS
-====
----- QUERY
-describe external_kudu
----- RESULTS
-'id','int',''
-'f','float',''
-'d','double',''
-'s','string',''
-'v','varchar(10)',''
-'t','tinyint',''
-'m','smallint',''
----- TYPES
-STRING,STRING,STRING
-====
----- QUERY
-drop table external_kudu
----- RESULTS
-=====
----- QUERY
-drop table managed_kudu
----- RESULTS
-=====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
index 8aa1457..494bb58 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test
@@ -1,7 +1,7 @@
 ====
 ---- QUERY
 # Make sure LIMIT is enforced.
-select * from dimtbl order by id limit 1;
+select * from functional_kudu.dimtbl order by id limit 1;
 ---- RESULTS
 1001,'Name1',94611
 ---- TYPES
@@ -10,8 +10,8 @@ BIGINT, STRING, INT
 ---- QUERY
 # Make sure that we can list the columns to be scanned in any order, that predicates
 # work and that we can have predicates on columns not referenced elsewhere.
-select zip, id from dimtbl where id >= 1000 and 1002 >= id and 94611 = zip and
-'Name1' = name order by id;
+select zip, id from functional_kudu.dimtbl where id >= 1000 and 1002 >= id and
+94611 = zip and 'Name1' = name order by id;
 ---- RESULTS
 94611,1001
 ---- TYPES
@@ -20,14 +20,8 @@ INT, BIGINT
 ---- QUERY
 # Regression test for IMPALA-2740, a NULL value from a previously filtered row would
 # carry over into the next unfiltered row (the result below would incorrectly be 2,NULL).
-USE kududb_test;
-CREATE TABLE impala_2740 (key INT, value INT)
-    DISTRIBUTE BY HASH INTO 3 BUCKETS
-    TBLPROPERTIES(
-        'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-        'kudu.table_name' = 'impala_2740',
-        'kudu.master_addresses' = '127.0.0.1',
-        'kudu.key_columns' = 'key');
+CREATE TABLE impala_2740 (key INT PRIMARY KEY, value INT)
+  DISTRIBUTE BY HASH (key) INTO 3 BUCKETS STORED AS KUDU;
 INSERT INTO impala_2740 VALUES (1, NULL), (2, -2);
 SELECT * FROM impala_2740 WHERE key != 1;
 ---- RESULTS
@@ -40,20 +34,10 @@ INT, INT
 # threads that are never started. The union and both scans land in the same fragment which
 # is run on all impalads. However, for the t1 table there is only as single scan range,
 # so two of the scan instances get empty scan ranges.
-CREATE TABLE impala_2635_t1 (id BIGINT, name STRING)
-    DISTRIBUTE BY RANGE SPLIT ROWS ((0))
-    TBLPROPERTIES(
-        'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-        'kudu.table_name' = 'impala_2635_t1',
-        'kudu.master_addresses' = '127.0.0.1',
-        'kudu.key_columns' = 'id');
-CREATE TABLE impala_2635_t2 (id BIGINT, name STRING)
-    DISTRIBUTE BY HASH(id) INTO 16 BUCKETS
-    TBLPROPERTIES(
-        'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-        'kudu.table_name' = 'impala_2635_t2',
-        'kudu.master_addresses' = '127.0.0.1',
-        'kudu.key_columns' = 'id');
+CREATE TABLE impala_2635_t1 (id BIGINT PRIMARY KEY, name STRING)
+  DISTRIBUTE BY HASH (id) INTO 3 BUCKETS STORED AS KUDU;
+CREATE TABLE impala_2635_t2 (id BIGINT PRIMARY KEY, name STRING)
+  DISTRIBUTE BY HASH(id) INTO 16 BUCKETS STORED AS KUDU;
 INSERT INTO impala_2635_t1 VALUES (0, 'Foo');
 INSERT INTO impala_2635_t2 VALUES (1, 'Blah');
 SELECT * FROM impala_2635_t1 UNION ALL SELECT * FROM impala_2635_t2;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test
deleted file mode 100644
index b07efb7..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test
+++ /dev/null
@@ -1,16 +0,0 @@
-====
----- QUERY
-SHOW CREATE TABLE functional_kudu.dimtbl
----- RESULTS
-CREATE TABLE functional_kudu.dimtbl (
-  id BIGINT,
-  name STRING,
-  zip INT
-)
-TBLPROPERTIES (
-  'kudu.master_addresses'='127.0.0.1:7051',
-  'kudu.key_columns'='id',
-  'kudu.table_name'='dimtbl',
-  'transient_lastDdlTime'='1441325601',
-  'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')
-====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
index b32e0d0..e6814e1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
@@ -1,20 +1,13 @@
 ====
 ---- QUERY
-create table simple (id int, name string, valf float, vali bigint)
-distribute by hash into 3 buckets
-TBLPROPERTIES(
- 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
- 'kudu.table_name' = 'query_test_simple',
- 'kudu.split_keys' = '[[10], [30]]',
- 'kudu.master_addresses' = '127.0.0.1',
- 'kudu.key_columns' = 'id'
-)
+create table simple (id int primary key, name string, valf float, vali bigint)
+  distribute by hash (id) into 3 buckets stored as kudu
 ---- RESULTS
 ====
 ---- QUERY
 -- Alter master address to a different location
 alter table simple set tblproperties (
-  'kudu.master_addresses' = '192.168.0.1'
+  'kudu.master_addresses' = 'localhost'
 )
 ---- RESULTS
 ====
@@ -22,7 +15,7 @@ alter table simple set tblproperties (
 -- Show that new address is picked up
 describe formatted simple
 ---- RESULTS: VERIFY_IS_SUBSET
-'','kudu.master_addresses','192.168.0.1         '
+'','kudu.master_addresses','localhost           '
 ---- TYPES
 STRING,STRING,STRING
 ====
@@ -31,6 +24,12 @@ alter table simple set tblproperties ('kudu.master_addresses' = '127.0.0.1')
 ---- RESULTS
 ====
 ---- QUERY
+-- Try to use an invalid master address
+alter table simple set tblproperties ('kudu.master_addresses' = 'invalid_host')
+---- CATCH
+ImpalaRuntimeException: Kudu table 'impala::$DATABASE.simple' does not exist on master 'invalid_host'
+====
+---- QUERY
 alter table simple rename to simple_new;
 ---- RESULTS
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
new file mode 100644
index 0000000..6950fb6
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test
@@ -0,0 +1,105 @@
+====
+---- QUERY
+# This test file contains several cases for what basically amount to analysis errors,
+# but they only show up at runtime. These cases correspond to constraints enforced by
+# the Kudu storage engine.
+#
+# Incompatible column types in CTAS.
+create table t primary key (id) distribute by hash (id) into 3 buckets
+stored as kudu
+as select * from functional.alltypestiny
+---- CATCH
+ImpalaRuntimeException: Type TIMESTAMP is not supported in Kudu
+====
+---- QUERY
+create table t primary key (id) distribute by hash (id) into 3 buckets
+stored as kudu
+as select c1 as id from functional.decimal_tiny
+---- CATCH
+ImpalaRuntimeException: Type DECIMAL(10,4) is not supported in Kudu
+====
+---- QUERY
+create table t (a int, b array<string>, primary key(a)) distribute by hash (a)
+into 3 buckets stored as kudu
+---- CATCH
+ImpalaRuntimeException: Non-scalar type ARRAY<STRING> is not supported in Kudu
+====
+---- QUERY
+create table t primary key (id) distribute by hash (id) into 3 buckets
+stored as kudu
+as select id, int_col from functional.alltypestiny;
+select * from t;
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Boolean primary key column
+create table tab (x int, y boolean, primary key(x, y))
+  distribute by hash (x) into 3 buckets stored as kudu
+---- CATCH
+NonRecoverableException: Key column may not have type of BOOL, FLOAT, or DOUBLE
+====
+---- QUERY
+# Float primary key column
+create table tab (x int, y float, primary key(x, y))
+  distribute by hash (x) into 3 buckets stored as kudu
+---- CATCH
+NonRecoverableException: Key column may not have type of BOOL, FLOAT, or DOUBLE
+====
+---- QUERY
+# Primary keys should be declared first
+# TODO: See KUDU-1709 for improving Kudu error messages.
+create table tab (x int, y int, primary key(y))
+  distribute by hash (y) into 3 buckets stored as kudu
+---- CATCH
+NonRecoverableException: Got out-of-order key column: name: "y" type: INT32 is_key: true is_nullable: false cfile_block_size: 0
+====
+---- QUERY
+# Small number of hash buckets
+create table tab (a int, b int, c int, d int, primary key(a, b, c))
+  distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets stored as kudu
+---- CATCH
+NonRecoverableException: must have at least two hash buckets
+====
+---- QUERY
+# Same column in multiple hash based distributions
+create table tab (a int, b int, primary key (a))
+  distribute by hash (a) into 3 buckets, hash (a) into 2 buckets stored as kudu
+---- CATCH
+NonRecoverableException: hash bucket schema components must not contain columns in common
+====
+---- QUERY
+# Same column referenced multiple times in the same hash-based distribution
+create table tab (a int primary key) distribute by hash (a, a, a) into 3 buckets
+stored as kudu
+---- CATCH
+NonRecoverableException: hash bucket schema components must not contain columns in common
+====
+---- QUERY
+# Kudu table that uses Impala keywords as table name and column names
+create table `add`(`analytic` int, `function` int, primary key(`analytic`, `function`))
+distribute by hash (`analytic`) into 4 buckets, range (`function`) split rows ((1), (10))
+stored as kudu;
+insert into `add` select id, int_col from functional.alltypestiny;
+select * from `add`
+---- RESULTS
+0,0
+1,1
+2,0
+3,1
+4,0
+5,1
+6,0
+7,1
+---- TYPES
+INT,INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
index 8a58375..a06d203 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test
@@ -1,52 +1,25 @@
 ====
 ---- QUERY
--- Test KuduClient will automatically set the default port if no port is given
-create table tdata_no_port ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'tdata_no_port',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id'
- )
----- RESULTS
-====
----- QUERY
 -- Invalid hostname
-create table tdata_bogus_host ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'tdata_no_port',
-'kudu.master_addresses' = 'bogus host name',
-'kudu.key_columns' = 'id'
- )
+create table tdata_bogus_host (id int primary key, name string, valf float, vali bigint)
+  DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
+  TBLPROPERTIES('kudu.master_addresses' = 'bogus host name')
 ---- CATCH
 Couldn't resolve this master's address bogus host name:7051
 ====
 ---- QUERY
 -- Non-existing host
-create table tdata_non_existing_host ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'tdata_no_port',
-'kudu.master_addresses' = 'bogus.host.name',
-'kudu.key_columns' = 'id'
- )
+create table tdata_non_existing_host
+(id int primary key, name string, valf float, vali bigint)
+  DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
+  TBLPROPERTIES('kudu.master_addresses' = 'bogus.host.name')
 ---- CATCH
 Couldn't resolve this master's address bogus.host.name:7051
 ====
 ---- QUERY
 create table tdata
-( id int, name string, valf float, vali bigint, valv varchar(20), valb boolean)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'tdata',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'id'
- )
+  (id int primary key, name string, valf float, vali bigint, valv varchar(20), valb boolean)
+  DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY
@@ -189,14 +162,9 @@ INT,STRING,STRING,BOOLEAN
 ====
 ---- QUERY
 -- Test that string case is ignored
-create table ignore_column_case ( Id int, NAME string, vAlf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10, 'b'), (30, 'a'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'ignore_column_case',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'Id,NAME'
- )
+create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint,
+  primary key (Id, NAME)) DISTRIBUTE BY RANGE SPLIT ROWS ((10, 'b'), (30, 'a'))
+  STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY
@@ -259,15 +227,8 @@ delete ignore a from tdata a, tdata b where a.id = 666
 ---- QUERY
 # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct
 # if the Kudu columns are of different types.
-create table impala_3454
-(key_1 tinyint, key_2 bigint)
-DISTRIBUTE BY HASH INTO 3 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'impala_3454',
-'kudu.master_addresses' = '0.0.0.0:7051',
-'kudu.key_columns' = 'key_1,key_2'
- )
+create table impala_3454 (key_1 tinyint, key_2 bigint, PRIMARY KEY (key_1, key_2))
+  DISTRIBUTE BY HASH INTO 3 BUCKETS STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
index e5b2a95..bd61407 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test
@@ -1,65 +1,9 @@
 ====
 ---- QUERY
--- Test allowing range distribution on a subset of the primary keys
-create table simple_range_with_key_projection ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE(name) SPLIT ROWS (('abc'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_range_with_key_projection',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
----- RESULTS
-====
----- QUERY
--- Test error handling for creating split rows on a non-key column
-create table simple_range_non_key_col ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE(name) SPLIT ROWS (('abc'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_range_non_key_col',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id'
- )
----- CATCH
-must specify only primary key columns for range partition component
-====
----- QUERY
--- Test error handling for split rows and wrong type
-create table simple_hash ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE(id) SPLIT ROWS (('abc'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
----- CATCH
-Expected int32 literal for column 'id' got 'abc'
-====
----- QUERY
--- Test error handling for distribute clauses
-create table simple_hash ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE(col_does_not_exist) SPLIT ROWS ((1))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
----- CATCH
-Unknown column: col_does_not_exist
-====
----- QUERY
 -- Test HASH partitioning
-create table simple_hash ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, HASH(name) INTO 2 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
+create table simple_hash (id int, name string, valf float, vali bigint,
+  PRIMARY KEY (id, name)) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS,
+  HASH(name) INTO 2 BUCKETS STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY
@@ -80,14 +24,9 @@ INT,STRING,STRING,STRING,INT
 ====
 ---- QUERY
 -- Test HASH and RANGE partitioning
-create table simple_hash_range ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash_range',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
+create table simple_hash_range (id int, name string, valf float, vali bigint,
+  PRIMARY KEY (id, name)) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS,
+  RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter')) STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY
@@ -112,13 +51,8 @@ INT,STRING,STRING,STRING,INT
 ====
 ---- QUERY
 create table simple_hash_range_ctas
-DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash_range_ctas',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
+  PRIMARY KEY (id, name) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS,
+  RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter')) STORED AS KUDU
 as select * from simple_hash
 ---- RESULTS
 'Inserted 0 row(s)'
@@ -145,14 +79,8 @@ INT,STRING,STRING,STRING,INT
 ====
 ---- QUERY
 -- Test HASH defaults to all columns
-create table simple_hash_all_columns ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY HASH INTO 4 BUCKETS
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_hash_all_columns',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
+create table simple_hash_all_columns (id int, name string, valf float, vali bigint,
+  PRIMARY KEY (id, name)) DISTRIBUTE BY HASH INTO 4 BUCKETS STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY
@@ -169,14 +97,9 @@ INT,STRING,STRING,STRING,INT
 ====
 ---- QUERY
 -- Test RANGE defaults to all columns
-create table simple_range_all_columns ( id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((1, 'a'), (2, 'b'))
-TBLPROPERTIES(
-'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-'kudu.table_name' = 'simple_range_all_columns',
-'kudu.master_addresses' = '127.0.0.1',
-'kudu.key_columns' = 'id, name'
- )
+create table simple_range_all_columns (id int, name string, valf float, vali bigint,
+  PRIMARY KEY (id, name)) DISTRIBUTE BY RANGE SPLIT ROWS ((1, 'a'), (2, 'b'))
+  STORED AS KUDU
 ---- RESULTS
 ====
 ---- QUERY

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
index 828d430..589bbf0 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test
@@ -1,14 +1,8 @@
 ====
 ---- QUERY
-create table simple (id int, name string, valf float, vali bigint)
-DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30))
-TBLPROPERTIES(
- 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
- 'kudu.table_name' = 'simple',
- 'kudu.master_addresses' = '127.0.0.1',
- 'kudu.key_columns' = 'id',
- 'kudu.num_tablet_replicas' = '2'
-)
+create table simple (id int primary key, name string, valf float, vali bigint)
+  DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
+  TBLPROPERTIES('kudu.num_tablet_replicas' = '2')
 ---- RESULTS
 ====
 ---- QUERY

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/common/__init__.py
----------------------------------------------------------------------
diff --git a/tests/common/__init__.py b/tests/common/__init__.py
index 010f973..665bfe2 100644
--- a/tests/common/__init__.py
+++ b/tests/common/__init__.py
@@ -1,2 +1,2 @@
-KUDU_MASTER_HOST = "127.0.0.1"
-KUDU_MASTER_PORT = 7051
+# These values should match the impalad startup flag -kudu_master_hosts
+KUDU_MASTER_HOSTS = "127.0.0.1"