You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/22 01:19:33 UTC

[1/3] incubator-impala git commit: IMPALA-5582: Store sentry privileges in lower case

Repository: incubator-impala
Updated Branches:
  refs/heads/master daff8eb0c -> ad0c6e749


IMPALA-5582: Store sentry privileges in lower case

Privileges granted to a role assigned to a db/table whose name
contains upper case characters can disappear after a few seconds.
A privilege is inserted into the catalogObjectCache using a key
that uses the db/table name. The key gets converted to a lower
case before inserting.
Privilege name returned by sentryProxy is always lower case,
which might not match the privilegeName built in the catalog.
This triggers an update of the catalog object followed by a
removal of the old object. Since they both use the same key
in lower case it ends up deleting the newly updated object.

This change also adds a new catalogd startup option
(sentry_catalog_polling_frequency)
to configure the frequency at which catalogd polls the sentry service
to update any policy changes. The default value is 60 seconds.

Test:
Added a test which adds select privileges to 3 tables and dbs specified
in lower case, upper case and mixed case. The test verifies that the
privileges on the 3 tables do not disappear on a sentry update.

Change-Id: Ide3dfa601fcf77f5acc6adce9bea443aea600901
Reviewed-on: http://gerrit.cloudera.org:8080/7332
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/322ccb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/322ccb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/322ccb0e

Branch: refs/heads/master
Commit: 322ccb0e4918623dad4edae68ad422018e58c7b5
Parents: daff8eb
Author: aphadke <ap...@cloudera.com>
Authored: Tue Jun 27 12:14:04 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 19:16:57 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog.cc                       |   3 +
 be/src/util/backend-gflag-util.cc               |   2 +
 common/thrift/BackendGflags.thrift              |   6 +-
 .../impala/catalog/AuthorizationPolicy.java     |  11 +-
 .../apache/impala/catalog/RolePrivilege.java    |  34 ++++--
 .../apache/impala/service/BackendConfig.java    |   3 +
 .../org/apache/impala/util/SentryProxy.java     |   7 +-
 .../queries/QueryTest/grant_revoke.test         | 100 ++++++++--------
 tests/authorization/test_grant_revoke.py        | 115 +++++++++++++++----
 9 files changed, 188 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 3ed8d7f..be7b93f 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -38,6 +38,9 @@ DEFINE_int32(num_metadata_loading_threads, 16,
 DEFINE_int32(initial_hms_cnxn_timeout_s, 120,
     "Number of seconds catalogd will wait to establish an initial connection to the HMS "
     "before exiting.");
+DEFINE_int64(sentry_catalog_polling_frequency_s, 60,
+    "Frequency (in seconds) at which the the catalogd polls the sentry service to update "
+    "any policy changes.");
 DEFINE_string(sentry_config, "", "Local path to a sentry-site.xml configuration "
     "file. If set, authorization will be enabled.");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index ab66a73..5523735 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -33,6 +33,7 @@ DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(initial_hms_cnxn_timeout_s);
 DECLARE_int32(kudu_operation_timeout_ms);
+DECLARE_int64(sentry_catalog_polling_frequency_s);
 DECLARE_int64(inc_stats_size_limit_bytes);
 DECLARE_string(principal);
 DECLARE_string(lineage_event_log_dir);
@@ -73,6 +74,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_local_library_path(FLAGS_local_library_dir);
   cfg.__set_kudu_operation_timeout_ms(FLAGS_kudu_operation_timeout_ms);
   cfg.__set_enable_partitioned_hash_join(FLAGS_enable_partitioned_hash_join);
+  cfg.__set_sentry_catalog_polling_frequency_s(FLAGS_sentry_catalog_polling_frequency_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3a410c2..b5f6838 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -31,8 +31,6 @@ struct TBackendGflags {
 
   5: required i64 inc_stats_size_limit_bytes
 
- 19: required bool enable_stats_extrapolation
-
   6: required string lineage_event_log_dir
 
   7: required bool load_catalog_in_background
@@ -58,4 +56,8 @@ struct TBackendGflags {
   17: required i32 initial_hms_cnxn_timeout_s
 
   18: required bool enable_partitioned_hash_join
+
+  19: required bool enable_stats_extrapolation
+
+  20: required i64 sentry_catalog_polling_frequency_s
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
index 56fc2f7..624caad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthorizationPolicy.java
@@ -328,12 +328,13 @@ public class AuthorizationPolicy implements PrivilegeCache {
         if (!privName.equalsIgnoreCase(privilege.getPrivilege_name())) continue;
       }
       TResultRowBuilder rowBuilder = new TResultRowBuilder();
-      rowBuilder.add(privilege.getScope().toString());
-      rowBuilder.add(Strings.nullToEmpty(privilege.getDb_name()));
-      rowBuilder.add(Strings.nullToEmpty(privilege.getTable_name()));
-      rowBuilder.add(Strings.nullToEmpty(privilege.getColumn_name()));
+      rowBuilder.add(privilege.getScope().toString().toLowerCase());
+      rowBuilder.add(Strings.nullToEmpty(privilege.getDb_name()).toLowerCase());
+      rowBuilder.add(Strings.nullToEmpty(privilege.getTable_name()).toLowerCase());
+      rowBuilder.add(Strings.nullToEmpty(privilege.getColumn_name()).toLowerCase());
+      // URIs are case sensitive
       rowBuilder.add(Strings.nullToEmpty(privilege.getUri()));
-      rowBuilder.add(privilege.getPrivilege_level().toString());
+      rowBuilder.add(privilege.getPrivilege_level().toString().toLowerCase());
       rowBuilder.add(Boolean.toString(privilege.isHas_grant_opt()));
       if (privilege.getCreate_time_ms() == -1) {
         rowBuilder.add(null);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
index 6ce035f..87277af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
+++ b/fe/src/main/java/org/apache/impala/catalog/RolePrivilege.java
@@ -66,30 +66,42 @@ public class RolePrivilege implements CatalogObject {
       Preconditions.checkNotNull(scope);
       switch (scope) {
         case SERVER: {
-          authorizable.add(KV_JOINER.join("server", privilege.getServer_name()));
+          authorizable.add(KV_JOINER.join("server", privilege.getServer_name().
+              toLowerCase()));
           break;
         }
         case URI: {
-          authorizable.add(KV_JOINER.join("server", privilege.getServer_name()));
+          authorizable.add(KV_JOINER.join("server", privilege.getServer_name().
+              toLowerCase()));
+          // (IMPALA-2695) URIs are case sensitive
           authorizable.add(KV_JOINER.join("uri", privilege.getUri()));
           break;
         }
         case DATABASE: {
-          authorizable.add(KV_JOINER.join("server", privilege.getServer_name()));
-          authorizable.add(KV_JOINER.join("db", privilege.getDb_name()));
+          authorizable.add(KV_JOINER.join("server", privilege.getServer_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("db", privilege.getDb_name().
+              toLowerCase()));
           break;
         }
         case TABLE: {
-          authorizable.add(KV_JOINER.join("server", privilege.getServer_name()));
-          authorizable.add(KV_JOINER.join("db", privilege.getDb_name()));
-          authorizable.add(KV_JOINER.join("table", privilege.getTable_name()));
+          authorizable.add(KV_JOINER.join("server", privilege.getServer_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("db", privilege.getDb_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("table", privilege.getTable_name().
+              toLowerCase()));
           break;
         }
         case COLUMN: {
-          authorizable.add(KV_JOINER.join("server", privilege.getServer_name()));
-          authorizable.add(KV_JOINER.join("db", privilege.getDb_name()));
-          authorizable.add(KV_JOINER.join("table", privilege.getTable_name()));
-          authorizable.add(KV_JOINER.join("column", privilege.getColumn_name()));
+          authorizable.add(KV_JOINER.join("server", privilege.getServer_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("db", privilege.getDb_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("table", privilege.getTable_name().
+              toLowerCase()));
+          authorizable.add(KV_JOINER.join("column", privilege.getColumn_name().
+              toLowerCase()));
           break;
         }
         default: {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index a1566c7..5b641e2 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -60,6 +60,9 @@ public class BackendConfig {
 
   public int getImpalaLogLevel() { return backendCfg_.impala_log_lvl; }
   public int getNonImpalaJavaVlogLevel() { return backendCfg_.non_impala_java_vlog; }
+  public long getSentryCatalogPollingFrequency() {
+    return backendCfg_.sentry_catalog_polling_frequency_s;
+  }
 
   public boolean isPartitionedHashJoinEnabled() {
     return backendCfg_.enable_partitioned_hash_join;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/fe/src/main/java/org/apache/impala/util/SentryProxy.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/SentryProxy.java b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
index 07669ec..642a778 100644
--- a/fe/src/main/java/org/apache/impala/util/SentryProxy.java
+++ b/fe/src/main/java/org/apache/impala/util/SentryProxy.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.Role;
 import org.apache.impala.catalog.RolePrivilege;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TPrivilege;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -85,9 +86,9 @@ public class SentryProxy {
       processUser_ = new User(kerberosPrincipal);
     }
     sentryPolicyService_ = new SentryPolicyService(sentryConfig);
-    // Sentry Service is enabled.
-    // TODO: Make this configurable
-    policyReader_.scheduleAtFixedRate(new PolicyReader(), 0, 60,
+
+    policyReader_.scheduleAtFixedRate(new PolicyReader(), 0,
+        BackendConfig.INSTANCE.getSentryCatalogPollingFrequency(),
         TimeUnit.SECONDS);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
index 8beb04f..3f219c5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test
@@ -59,7 +59,7 @@ STRING
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS
-'SERVER','','','','','ALL',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -68,7 +68,7 @@ STRING, STRING, STRING, STRING, STRING, STRING, BOOLEAN, STRING
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER on server
 ---- RESULTS
-'SERVER','','','','','ALL',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -262,9 +262,9 @@ STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ---- QUERY
 show grant role grant_revoke_test_ALL_URI
 ---- RESULTS
-'URI','','','','$NAMENODE/test-warehouse/grant_rev_test_tbl2','ALL',FALSE,regex:.+
-'URI','','','','$NAMENODE/test-warehouse/GRANT_REV_TEST_TBL3','ALL',FALSE,regex:.+
-'URI','','','','$NAMENODE/test-warehouse/grant_rev_test_prt','ALL',FALSE,regex:.+
+'uri','','','','$NAMENODE/test-warehouse/grant_rev_test_tbl2','all',false,regex:.+
+'uri','','','','$NAMENODE/test-warehouse/GRANT_REV_TEST_TBL3','all',false,regex:.+
+'uri','','','','$NAMENODE/test-warehouse/grant_rev_test_prt','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -313,8 +313,8 @@ GRANT INSERT ON TABLE grant_rev_db.test_tbl1 TO grant_revoke_test_SELECT_INSERT_
 ---- QUERY
 show grant role grant_revoke_test_SELECT_INSERT_TEST_TBL on table grant_rev_db.test_tbl1
 ---- RESULTS
-'TABLE','grant_rev_db','test_tbl1','','','SELECT',FALSE,regex:.+
-'TABLE','grant_rev_db','test_tbl1','','','INSERT',FALSE,regex:.+
+'table','grant_rev_db','test_tbl1','','','select',false,regex:.+
+'table','grant_rev_db','test_tbl1','','','insert',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -415,8 +415,8 @@ root
 ---- QUERY
 show grant role grant_revoke_test_ROOT
 ---- RESULTS
-'DATABASE','functional','','','','ALL',TRUE,regex:.+
-'TABLE','functional','alltypes','','','ALL',FALSE,regex:.+
+'database','functional','','','','all',true,regex:.+
+'table','functional','alltypes','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -477,7 +477,7 @@ STRING,STRING
 # Privilege still exists, but grant option is set to false
 show grant role grant_revoke_test_ROOT
 ---- RESULTS
-'DATABASE','functional','','','','ALL',FALSE,regex:.+
+'database','functional','','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -513,10 +513,10 @@ GRANT SELECT (a, b, x) ON TABLE grant_rev_db.test_tbl3 TO grant_revoke_test_ALL_
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','b','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','x','','SELECT',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','b','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','x','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -529,13 +529,13 @@ GRANT SELECT (c, d, y) ON TABLE grant_rev_db.test_tbl3 TO grant_revoke_test_ALL_
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','b','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','c','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','d','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','x','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','y','','SELECT',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','b','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','c','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','d','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','x','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','y','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -548,14 +548,14 @@ GRANT SELECT (a, a, e, x) ON TABLE grant_rev_db.test_tbl3 TO grant_revoke_test_A
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','b','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','c','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','d','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','e','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','x','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','y','','SELECT',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','b','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','c','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','d','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','e','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','x','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','y','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -569,11 +569,11 @@ REVOKE SELECT (a, b, b, y) ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_tes
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','c','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','d','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','e','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','x','','SELECT',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','c','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','d','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','e','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','x','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -586,9 +586,9 @@ REVOKE SELECT (a, b, c, x) ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_tes
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','d','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','e','','SELECT',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','d','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','e','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -601,7 +601,7 @@ REVOKE SELECT (a, b, c, d, e) ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -619,7 +619,7 @@ root
 ---- QUERY
 show grant role grant_revoke_test_ROOT
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'TABLE','grant_rev_db','test_tbl3','','','SELECT',FALSE,regex:.+
+'table','grant_rev_db','test_tbl3','','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -652,8 +652,8 @@ root
 ---- QUERY
 show grant role grant_revoke_test_ROOT
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'TABLE','grant_rev_db','test_tbl3','','','SELECT',TRUE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',FALSE,regex:.+
+'table','grant_rev_db','test_tbl3','','','select',true,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -666,10 +666,10 @@ GRANT SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 TO grant_revoke_test_ALL_
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',TRUE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','c','','SELECT',TRUE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','e','','SELECT',TRUE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',true,regex:.+
+'column','grant_rev_db','test_tbl3','c','','select',true,regex:.+
+'column','grant_rev_db','test_tbl3','e','','select',true,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -683,10 +683,10 @@ REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 FROM grant
 # TODO: Add a test case that exercises the cascading effect of REVOKE ALL.
 show grant role grant_revoke_test_ALL_SERVER
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','a','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','c','','SELECT',FALSE,regex:.+
-'COLUMN','grant_rev_db','test_tbl3','e','','SELECT',TRUE,regex:.+
+'server','','','','','all',false,regex:.+
+'column','grant_rev_db','test_tbl3','a','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','c','','select',false,regex:.+
+'column','grant_rev_db','test_tbl3','e','','select',true,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES
@@ -723,7 +723,7 @@ does not have privileges to execute 'CREATE' on: grant_rev_db
 ---- QUERY
 show grant role grant_revoke_test_ALL_SERVER1
 ---- RESULTS: VERIFY_IS_EQUAL_SORTED
-'SERVER','','','','','ALL',FALSE,regex:.+
+'server','','','','','all',false,regex:.+
 ---- LABELS
 scope, database, table, column, uri, privilege, grant_option, create_time
 ---- TYPES

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322ccb0e/tests/authorization/test_grant_revoke.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_grant_revoke.py b/tests/authorization/test_grant_revoke.py
index 987aecb..9430350 100644
--- a/tests/authorization/test_grant_revoke.py
+++ b/tests/authorization/test_grant_revoke.py
@@ -26,6 +26,7 @@ from time import sleep
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.util.calculation_util import get_random_id
 from tests.verifiers.metric_verifier import MetricVerifier
 
 SENTRY_CONFIG_FILE = getenv('IMPALA_HOME') + '/fe/src/test/resources/sentry-site.xml'
@@ -98,6 +99,72 @@ class TestGrantRevoke(CustomClusterTestSuite, ImpalaTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--server_name=server1",
+      catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE +
+                    " --sentry_catalog_polling_frequency_s=1",
+      statestored_args="--statestore_update_frequency_ms=300")
+  def test_role_privilege_case(self, vector):
+    """IMPALA-5582: Store sentry privileges in lower case. This
+    test grants select privileges to roles assgined to tables/db
+    specified in lower, upper and mix cases. This test verifies
+    that these privileges do not vanish on a sentryProxy thread
+    update.
+    """
+    db_name = "test_role_privilege_case_x_" + get_random_id(5)
+    db_name_upper_case = "TEST_ROLE_PRIVILEGE_CASE_Y_" + get_random_id(5).upper()
+    db_name_mixed_case = "TesT_Role_PRIVIlege_case_z" + get_random_id(5)
+    role_name = "test_role_" + get_random_id(5)
+    try:
+      self.client.execute("create role {0}".format(role_name))
+      self.client.execute("grant all on server to {0}".format(role_name))
+      self.client.execute(
+          "grant role {0} to group {1}".format(
+           role_name, grp.getgrnam(getuser()).gr_name))
+
+      self.client.execute("create database " + db_name)
+      self.client.execute("create database " + db_name_upper_case)
+      self.client.execute("create database " + db_name_mixed_case)
+      self.client.execute(
+          "create table if not exists {0}.test1(i int)".format(db_name))
+      self.client.execute("create table if not exists {0}.TEST2(i int)".format(db_name))
+      self.client.execute("create table if not exists {0}.Test3(i int)".format(db_name))
+
+      self.client.execute(
+          "grant select on table {0}.test1 to {1}".format(db_name, role_name))
+      self.client.execute(
+          "grant select on table {0}.TEST2 to {1}".format(db_name, role_name))
+      self.client.execute(
+          "grant select on table {0}.TesT3 to {1}".format(db_name, role_name))
+      self.client.execute("grant all on database {0} to {1}".format(db_name, role_name))
+      self.client.execute(
+          "grant all on database {0} to {1}".format(db_name_upper_case, role_name))
+      self.client.execute(
+          "grant all on database {0} to {1}".format(db_name_mixed_case, role_name))
+      result = self.client.execute("show grant role {0}".format(role_name))
+      assert any('test1' in x for x in result.data)
+      assert any('test2' in x for x in result.data)
+      assert any('test3' in x for x in result.data)
+      assert any(db_name_upper_case.lower() in x for x in result.data)
+      assert any(db_name_mixed_case.lower() in x for x in result.data)
+      # Sleep for 2 seconds and make sure that the privileges
+      # on all 3 tables still persist on a sentryProxy thread
+      # update. sentry_catalog_polling_frequency_s is set to 1
+      # seconds.
+      sleep(2)
+      result = self.client.execute("show grant role {0}".format(role_name))
+      assert any('test1' in x for x in result.data)
+      assert any('test2' in x for x in result.data)
+      assert any('test3' in x for x in result.data)
+      assert any(db_name_upper_case.lower() in x for x in result.data)
+      assert any(db_name_mixed_case.lower() in x for x in result.data)
+    finally:
+      self.client.execute("drop database if exists {0}".format(db_name_upper_case))
+      self.client.execute("drop database if exists {0}".format(db_name_mixed_case))
+      self.client.execute("drop database if exists {0} cascade".format(db_name))
+      self.client.execute("drop role {0}".format(role_name))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--server_name=server1",
       catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
       statestored_args=("--statestore_heartbeat_frequency_ms=300 "
                         "--statestore_update_frequency_ms=300"))
@@ -106,25 +173,29 @@ class TestGrantRevoke(CustomClusterTestSuite, ImpalaTestSuite):
     reverse order if a role was modified, but not the associated privilege. Verify that
     Impala is able to handle this.
     """
-    self.client.execute("create role test_role")
-    self.client.execute("grant all on server to test_role")
-    # Wait a few seconds to make sure the update propagates to the statestore.
-    sleep(3)
-    # Update the role, increasing its catalog verion.
-    self.client.execute("grant role test_role to group {0}".format(
-        grp.getgrnam(getuser()).gr_name))
-    result = self.client.execute("show tables in functional")
-    assert 'alltypes' in result.data
-    privileges_before = self.client.execute("show grant role test_role")
-    # Wait a few seconds before restarting Impalad to make sure that the Catalog gets
-    # updated.
-    sleep(3)
-    self.restart_first_impalad()
-    verifier = MetricVerifier(self.cluster.impalads[0].service)
-    verifier.wait_for_metric("catalog.ready", True)
-    # Verify that we still have the right privileges after the first impalad was
-    # restarted.
-    result = self.client.execute("show tables in functional");
-    assert 'alltypes' in result.data
-    privileges_after = self.client.execute("show grant role test_role")
-    assert privileges_before.data == privileges_after.data
+    role_name = "test_role_" + get_random_id(5)
+    try:
+      self.client.execute("create role {0}".format(role_name))
+      self.client.execute("grant all on server to {0}".format(role_name))
+      # Wait a few seconds to make sure the update propagates to the statestore.
+      sleep(3)
+      # Update the role, increasing its catalog verion.
+      self.client.execute("grant role {0} to group {1}".format(
+          role_name, grp.getgrnam(getuser()).gr_name))
+      result = self.client.execute("show tables in functional")
+      assert 'alltypes' in result.data
+      privileges_before = self.client.execute("show grant role {0}".format(role_name))
+      # Wait a few seconds before restarting Impalad to make sure that the Catalog gets
+      # updated.
+      sleep(3)
+      self.restart_first_impalad()
+      verifier = MetricVerifier(self.cluster.impalads[0].service)
+      verifier.wait_for_metric("catalog.ready", True)
+      # Verify that we still have the right privileges after the first impalad was
+      # restarted.
+      result = self.client.execute("show tables in functional")
+      assert 'alltypes' in result.data
+      privileges_after = self.client.execute("show grant role {0}".format(role_name))
+      assert privileges_before.data == privileges_after.data
+    finally:
+      self.client.execute("drop role {0}".format(role_name))


[2/3] incubator-impala git commit: IMPALA-5167: Reduce the number of Kudu clients created (FE)

Posted by mj...@apache.org.
IMPALA-5167: Reduce the number of Kudu clients created (FE)

Creating Kudu clients is very expensive as each will fetch
metadata from the Kudu master, so we should minimize the
number of Kudu clients that get created.

This patch stores a map from Kudu master addressed to Kudu
clients in KuduUtil to be used across the FE and catalog.
Another patch has already addressed the BE.

Future work will consider providing a way to invalidate
the stored Kudu clients in case something goes wrong
(IMPALA-5685)

This relies on two changes on the Kudu side: one that clears
non-covered range entries from the client's cache on table
open (d07ecd6ded01201c912d2e336611a6a941f48d98), and one
that automatically refreshes auth tokens when they expire
(603c1578c78c0377ffafdd9c427ebfd8a206bda3).

This patch disables some tests that no longer work as
they relied on Kudu metadata loading operations timing out,
but since we're reusing clients the metadata is already
loaded when the test is run.

Testing:
- Ran a stress test on a 10 node cluster: scan of a small
  Kudu table, 1000 concurrent queries, load on the Kudu
  master was reduced signficantly, from ~50% cpu to ~5%.
  (with the BE changes included)
- Ran the Kudu e2e tests.
- Manually ran a test with concurrent INSERTs and
  'ALTER TABLE ADD PARTITION' (which is affected by the
  Kudu side change mentiond above) and verified
  correctness.

Change-Id: I9b0b346f37ee43f7f0eefe34a093eddbbdcf2a5e
Reviewed-on: http://gerrit.cloudera.org:8080/6898
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/399b184b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/399b184b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/399b184b

Branch: refs/heads/master
Commit: 399b184bbcf5a1fb06b5afbebf9062e69d02beed
Parents: 322ccb0
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue May 16 09:37:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jul 21 21:49:04 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/KuduTable.java    |  9 ++++--
 .../org/apache/impala/planner/KuduScanNode.java |  3 +-
 .../impala/service/KuduCatalogOpExecutor.java   | 18 +++++++----
 .../java/org/apache/impala/util/KuduUtil.java   | 32 +++++++++++++-------
 .../QueryTest/kudu-timeouts-catalogd.test       | 12 +++++---
 .../QueryTest/kudu-timeouts-impalad.test        | 12 +++++---
 6 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index a79ffe0..cb94503 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -204,7 +204,8 @@ public class KuduTable extends Table {
     setTableStats(msTable_);
 
     // Connect to Kudu to retrieve table metadata
-    try (KuduClient kuduClient = KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       kuduTable = kuduClient.openTable(kuduTableName_);
     } catch (KuduException e) {
       throw new TableLoadingException(String.format(
@@ -389,7 +390,8 @@ public class KuduTable extends Table {
     resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift()));
     resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift()));
 
-    try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_);
       List<LocatedTablet> tablets =
           kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
@@ -432,7 +434,8 @@ public class KuduTable extends Table {
     // Build column header
     String header = "RANGE (" + Joiner.on(',').join(getRangePartitioningColNames()) + ")";
     resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift()));
-    try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_);
       // The Kudu table API will return the partitions in sorted order by value.
       List<String> partitions = kuduTable.getFormattedRangePartitions(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 4687129..57403e4 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -111,7 +111,8 @@ public class KuduScanNode extends ScanNode {
   public void init(Analyzer analyzer) throws ImpalaRuntimeException {
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
-    try (KuduClient client = KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable rpcTable =
           client.openTable(kuduTable_.getKuduTableName());
       validateSchema(rpcTable);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 9984239..cbbfccf 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -76,7 +76,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Creating table '%s' in master '%s'", kuduTableName,
           masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity.
       // (see KUDU-1710).
       if (kudu.tableExists(kuduTableName)) {
@@ -213,7 +214,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Dropping table '%s' from master '%s'", tableName,
           masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
       // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity.
       // (see KUDU-1710).
@@ -244,7 +246,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Loading schema of table '%s' from master '%s'",
           kuduTableName, masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       if (!kudu.tableExists(kuduTableName)) {
         throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " +
             "'%s'", kuduTableName));
@@ -286,7 +289,8 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
     String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
     Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       kudu.tableExists(kuduTableName);
     } catch (Exception e) {
       // TODO: This is misleading when there are other errors, e.g. timeouts.
@@ -305,7 +309,8 @@ public class KuduCatalogOpExecutor {
     alterTableOptions.renameTable(newName);
     String errMsg = String.format("Error renaming Kudu table " +
         "%s to %s", tbl.getKuduTableName(), newName);
-    try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
+    try {
       client.alterTable(tbl.getKuduTableName(), alterTableOptions);
       if (!client.isAlterTableDone(newName)) {
         throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out");
@@ -475,7 +480,8 @@ public class KuduCatalogOpExecutor {
    */
   public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, String errMsg)
       throws ImpalaRuntimeException {
-    try (KuduClient client = KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
+    try {
       client.alterTable(tbl.getKuduTableName(), ato);
       if (!client.isAlterTableDone(tbl.getKuduTableName())) {
         throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed out");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index be98cf6..4df8005 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -20,6 +20,7 @@ package org.apache.impala.util;
 import static java.lang.String.format;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.DescriptorTable;
@@ -55,6 +56,7 @@ import org.apache.kudu.client.RangePartitionBound;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class KuduUtil {
 
@@ -66,19 +68,27 @@ public class KuduUtil {
   // be sufficient for the Frontend/Catalog use, and has been tested in stress tests.
   private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5;
 
+  // Maps lists of master addresses to KuduClients, for sharing clients across the FE.
+  private static Map<String, KuduClient> kuduClients_ = Maps.newHashMap();
+
   /**
-   * Creates a KuduClient with the specified Kudu master addresses (as a comma-separated
-   * list of host:port pairs). The 'admin operation timeout' and the 'operation timeout'
-   * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is
-   * used for operations like creating/deleting tables. The 'operation timeout' is used
-   * when fetching tablet metadata.
+   * Gets a KuduClient for the specified Kudu master addresses (as a comma-separated
+   * list of host:port pairs). It will look up and share an existing KuduClient, if
+   * possible, or it will create a new one to return.
+   * The 'admin operation timeout' and the 'operation timeout' are set to
+   * BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is used for
+   * operations like creating/deleting tables. The 'operation timeout' is used when
+   * fetching tablet metadata.
    */
-  public static KuduClient createKuduClient(String kuduMasters) {
-    KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
-    b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
-    b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
-    b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
-    return b.build();
+  public static KuduClient getKuduClient(String kuduMasters) {
+    if (!kuduClients_.containsKey(kuduMasters)) {
+      KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
+      b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
+      b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
+      b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
+      kuduClients_.put(kuduMasters, b.build());
+    }
+    return kuduClients_.get(kuduMasters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
index d811cfd..63af18d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
@@ -1,3 +1,10 @@
+# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685)
+#====
+#---- QUERY
+#show create table functional_kudu.alltypestiny
+#---- CATCH
+#Error opening Kudu table 'impala::functional_kudu.alltypestiny'
+#====
 ====
 ---- QUERY
 # TODO: improve error messages (here and below) when KUDU-1734 is resolved
@@ -6,11 +13,6 @@ describe functional_kudu.alltypes
 Error opening Kudu table 'impala::functional_kudu.alltypes'
 ====
 ---- QUERY
-show create table functional_kudu.alltypes
----- CATCH
-Error opening Kudu table 'impala::functional_kudu.alltypes'
-====
----- QUERY
 create table test_kudu (x int primary key)
 partition by hash(x) partitions 3 stored as kudu
 ---- CATCH

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
index ba3341e..cde4df5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
@@ -1,3 +1,10 @@
+# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685)
+#====
+#---- QUERY
+#show table stats functional_kudu.alltypestiny
+#---- CATCH
+#Error accessing Kudu for table stats
+#====
 ====
 ---- QUERY
 # Expected timeout while planning the scan node.
@@ -6,8 +13,3 @@ select * from functional_kudu.alltypes
 ---- CATCH
 Unable to initialize the Kudu scan node
 ====
----- QUERY
-show table stats functional_kudu.alltypes
----- CATCH
-Error accessing Kudu for table stats
-====


[3/3] incubator-impala git commit: IMPALA-5498: Support for partial sorts in Kudu INSERTs

Posted by mj...@apache.org.
IMPALA-5498: Support for partial sorts in Kudu INSERTs

Impala currently supports total sorts (the entire set of data
is sorted) and top-n sorts (only the highest/lowest n elements
are sorted). This patch adds the ability to do partial sorts,
where the data is divided up into some number of subsets, each
of which is sorted individually.

It accomplishes this by adding a new exec node, PartialSortNode.
When PartialSortNode::GetNext() is called, it retrieves input
up to the query memory limit, uses the existing Sorter class to sort
it, and outputs it. This is faster than a total sort with SortNode
as it avoids the need to spill if the input is larger than the
memory limit.

Future work will look into setting a more restrictive memory limit
on the PartialSortNode. (IMPALA-5669)

In the planner, the SortNode plan node is used, with an enum value
indicating if it is a total or partial sort.

This also adds a new counter 'RunSize' to the runtime profile which
tracks the min, max, and avg size of the generated runs, in tuples.

As a first use case, partial sort is used where a total sort was
used previously for inserts/upserts into Kudu tables only. Future
work can extend this to other table sinks. (IMPALA-5649)

Testing:
- E2E test with a large INSERT into a Kudu table with a mem limit.
  Checks that no spills occurred.
- Updated planner tests.
- Existing E2E tests and stress test verify correctness of INSERT.
- Perf tests on the 10 node cluster: inserting tpch_100.lineitem
  into a Kudu table with mem_limit=3gb:
  Previously: 5 runs are spilled, sort took 7m33s
  Now: no spills, sort takes 6m19s, for ~18% speedup

Change-Id: Ieec2a15a0cc5240b1c13682067ab64670d1e0a38
Reviewed-on: http://gerrit.cloudera.org:8080/7267
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ad0c6e74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ad0c6e74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ad0c6e74

Branch: refs/heads/master
Commit: ad0c6e7499534d70d5b7de8e38199a9c5cfcbb48
Parents: 399b184
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu Jun 22 12:26:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Jul 22 00:28:36 2017 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/exec-node.cc                        |   6 +-
 be/src/exec/partial-sort-node.cc                | 172 +++++++++++++++++++
 be/src/exec/partial-sort-node.h                 | 100 +++++++++++
 be/src/exec/sort-node.h                         |   6 +-
 be/src/runtime/sorter.cc                        |  33 ++--
 be/src/runtime/sorter.h                         |  35 +++-
 be/src/util/runtime-profile-counters.h          |   2 +
 common/thrift/PlanNodes.thrift                  |  17 +-
 .../apache/impala/planner/AnalyticPlanner.java  |   3 +-
 .../java/org/apache/impala/planner/Planner.java |  16 +-
 .../impala/planner/SingleNodePlanner.java       |   9 +-
 .../org/apache/impala/planner/SortNode.java     |  97 ++++++++---
 .../queries/PlannerTest/kudu-upsert.test        |  16 +-
 .../queries/PlannerTest/kudu.test               |  12 +-
 .../queries/QueryTest/kudu_insert.test          |   4 +-
 16 files changed, 459 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 6b33753..7d86f1c 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -73,6 +73,7 @@ add_library(Exec
   parquet-column-readers.cc
   parquet-column-stats.cc
   parquet-metadata-utils.cc
+  partial-sort-node.cc
   partitioned-aggregation-node.cc
   partitioned-aggregation-node-ir.cc
   partitioned-hash-join-builder.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 5618fef..7954660 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -42,6 +42,7 @@
 #include "exec/kudu-scan-node-mt.h"
 #include "exec/kudu-util.h"
 #include "exec/nested-loop-join-node.h"
+#include "exec/partial-sort-node.h"
 #include "exec/partitioned-aggregation-node.h"
 #include "exec/partitioned-hash-join-node.h"
 #include "exec/select-node.h"
@@ -330,9 +331,12 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
       *node = pool->Add(new SelectNode(pool, tnode, descs));
       break;
     case TPlanNodeType::SORT_NODE:
-      if (tnode.sort_node.use_top_n) {
+      if (tnode.sort_node.type == TSortType::PARTIAL) {
+        *node = pool->Add(new PartialSortNode(pool, tnode, descs));
+      } else if (tnode.sort_node.type == TSortType::TOPN) {
         *node = pool->Add(new TopNNode(pool, tnode, descs));
       } else {
+        DCHECK(tnode.sort_node.type == TSortType::TOTAL);
         *node = pool->Add(new SortNode(pool, tnode, descs));
       }
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
new file mode 100644
index 0000000..4f485d5
--- /dev/null
+++ b/be/src/exec/partial-sort-node.cc
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partial-sort-node.h"
+
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/sorted-run-merger.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+PartialSortNode::PartialSortNode(
+    ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
+  : ExecNode(pool, tnode, descs),
+    sorter_(nullptr),
+    input_batch_index_(0),
+    input_eos_(false),
+    sorter_eos_(true) {}
+
+PartialSortNode::~PartialSortNode() {
+  DCHECK(input_batch_.get() == nullptr);
+}
+
+Status PartialSortNode::Init(const TPlanNode& tnode, RuntimeState* state) {
+  DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
+  DCHECK(limit_ == -1);
+  const TSortInfo& tsort_info = tnode.sort_node.sort_info;
+  RETURN_IF_ERROR(ExecNode::Init(tnode, state));
+  RETURN_IF_ERROR(ScalarExpr::Create(
+      tsort_info.ordering_exprs, row_descriptor_, state, &ordering_exprs_));
+  DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
+  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
+      *child(0)->row_desc(), state, &sort_tuple_exprs_));
+  is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
+  nulls_first_ = tnode.sort_node.sort_info.nulls_first;
+  return Status::OK();
+}
+
+Status PartialSortNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Prepare(state));
+  less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
+  sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
+      mem_tracker(), runtime_profile(), state, false));
+  RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+  AddCodegenDisabledMessage(state);
+  input_batch_.reset(
+      new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
+  return Status::OK();
+}
+
+void PartialSortNode::Codegen(RuntimeState* state) {
+  DCHECK(state->ShouldCodegen());
+  ExecNode::Codegen(state);
+  if (IsNodeCodegenDisabled()) return;
+  Status codegen_status = less_than_->Codegen(state);
+  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
+}
+
+Status PartialSortNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecNode::Open(state));
+  RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  RETURN_IF_ERROR(child(0)->Open(state));
+  return Status::OK();
+}
+
+Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  DCHECK_EQ(row_batch->num_rows(), 0);
+  if (!sorter_eos_) {
+    // There were rows in the current run that didn't fit in the last output batch.
+    RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
+    if (sorter_eos_) {
+      sorter_->Reset();
+      *eos = input_eos_;
+    }
+    num_rows_returned_ += row_batch->num_rows();
+    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    return Status::OK();
+  }
+
+  if (input_eos_) {
+    *eos = true;
+    return Status::OK();
+  }
+
+  DCHECK(sorter_eos_);
+  RETURN_IF_ERROR(sorter_->Open());
+  do {
+    if (input_batch_index_ == input_batch_->num_rows()) {
+      input_batch_->Reset();
+      input_batch_index_ = 0;
+      RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), &input_eos_));
+    }
+
+    int num_processed;
+    RETURN_IF_ERROR(
+        sorter_->AddBatchNoSpill(input_batch_.get(), input_batch_index_, &num_processed));
+    input_batch_index_ += num_processed;
+    DCHECK(input_batch_index_ <= input_batch_->num_rows());
+    RETURN_IF_ERROR(QueryMaintenance(state));
+  } while (input_batch_index_ == input_batch_->num_rows() && !input_eos_);
+
+  RETURN_IF_ERROR(sorter_->InputDone());
+  RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
+  if (sorter_eos_) {
+    sorter_->Reset();
+    *eos = input_eos_;
+  }
+
+  num_rows_returned_ += row_batch->num_rows();
+  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  return Status::OK();
+}
+
+Status PartialSortNode::Reset(RuntimeState* state) {
+  DCHECK(false) << "PartialSortNode cannot be part of a subplan.";
+  return ExecNode::Reset(state);
+}
+
+void PartialSortNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  child(0)->Close(state);
+  if (less_than_.get() != nullptr) less_than_->Close(state);
+  if (sorter_ != nullptr) sorter_->Close(state);
+  sorter_.reset();
+  ScalarExpr::Close(ordering_exprs_);
+  ScalarExpr::Close(sort_tuple_exprs_);
+  input_batch_.reset();
+  ExecNode::Close(state);
+}
+
+Status PartialSortNode::QueryMaintenance(RuntimeState* state) {
+  sorter_->FreeLocalAllocations();
+  return ExecNode::QueryMaintenance(state);
+}
+
+void PartialSortNode::DebugString(int indentation_level, stringstream* out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "PartialSortNode(" << ScalarExpr::DebugString(ordering_exprs_);
+  for (int i = 0; i < is_asc_order_.size(); ++i) {
+    *out << (i > 0 ? " " : "") << (is_asc_order_[i] ? "asc" : "desc") << " nulls "
+         << (nulls_first_[i] ? "first" : "last");
+  }
+  ExecNode::DebugString(indentation_level, out);
+  *out << ")";
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
new file mode 100644
index 0000000..ab4c547
--- /dev/null
+++ b/be/src/exec/partial-sort-node.h
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARTIAL_SORT_NODE_H
+#define IMPALA_EXEC_PARTIAL_SORT_NODE_H
+
+#include "exec/exec-node.h"
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/sorter.h"
+
+namespace impala {
+
+/// Node that implements a partial sort, where its input is divided up into runs, each
+/// of which is sorted individually.
+///
+/// In GetNext(), PartialSortNode accepts rows up to its memory limit and sorts them,
+/// creating a single sorted run. It then outputs as many rows as fit in the output batch.
+/// Subsequent calls to GetNext() continue to ouptut rows from the sorted run until it is
+/// exhausted, at which point the next call to GetNext() will again accept rows to create
+/// another run. This means that PartialSortNode never spills to disk.
+///
+/// Uses Sorter and BufferedBlockMgr for the external sort implementation. The sorter
+/// instance owns the sorted data.
+///
+/// Input rows to PartialSortNode may consist of several tuples. The Sorter materializes
+/// them into a single tuple using the expressions specified in sort_tuple_exprs_. This
+/// single tuple is then what the sort operates on.
+///
+/// PartialSortNode does not support limits or offsets.
+class PartialSortNode : public ExecNode {
+ public:
+  PartialSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
+  ~PartialSortNode();
+
+  virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
+  virtual Status Prepare(RuntimeState* state);
+  virtual void Codegen(RuntimeState* state);
+  virtual Status Open(RuntimeState* state);
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
+  virtual Status Reset(RuntimeState* state);
+  virtual void Close(RuntimeState* state);
+
+ protected:
+  virtual Status QueryMaintenance(RuntimeState* state);
+  virtual void DebugString(int indentation_level, std::stringstream* out) const;
+
+ private:
+  /// Compares tuples according to 'ordering_exprs'.
+  boost::scoped_ptr<TupleRowComparator> less_than_;
+
+  /// Expressions and parameters used for tuple comparison.
+  std::vector<ScalarExpr*> ordering_exprs_;
+
+  /// Expressions used to materialize slots in the tuples to be sorted.
+  /// One expr per slot in the materialized tuple.
+  std::vector<ScalarExpr*> sort_tuple_exprs_;
+
+  std::vector<bool> is_asc_order_;
+  std::vector<bool> nulls_first_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// Object used for external sorting.
+  boost::scoped_ptr<Sorter> sorter_;
+
+  /// The current batch of rows retrieved from the input (the output of child(0)). This
+  /// allows us to store rows across calls to GetNext when the sorter run fills up.
+  std::unique_ptr<RowBatch> input_batch_;
+
+  /// The index in 'input_batch_' of the next row to be passed to the sorter.
+  int input_batch_index_;
+
+  /// True if the end of the input (the output of child(0)) has been reached.
+  bool input_eos_;
+
+  /// True if the current run in the sorter has been fully output. This node is done when
+  /// both 'sorter_eos_' and 'input_eos_' are true.
+  bool sorter_eos_;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index cbe5b68..8b3de11 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -52,15 +52,15 @@ class SortNode : public ExecNode {
 
  private:
   /// Fetch input rows and feed them to the sorter until the input is exhausted.
-  Status SortInput(RuntimeState* state);
+  Status SortInput(RuntimeState* state) WARN_UNUSED_RESULT;
 
   /// Number of rows to skip.
   int64_t offset_;
 
-  /// The tuple row comparator derived based on 'sort_exec_exprs_'.
+  /// Compares tuples according to 'ordering_exprs'.
   boost::scoped_ptr<TupleRowComparator> less_than_;
 
-  /// Expressions and parameters used for tuple materialization and tuple comparison.
+  /// Expressions and parameters used for tuple comparison.
   std::vector<ScalarExpr*> ordering_exprs_;
 
   /// Expressions used to materialize slots in the tuples to be sorted.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 6760373..b4ef279 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1339,9 +1339,9 @@ inline void Sorter::TupleSorter::Swap(Tuple* left, Tuple* right, Tuple* swap_tup
 }
 
 Sorter::Sorter(const TupleRowComparator& compare_less_than,
-    const vector<ScalarExpr*>& sort_tuple_exprs,
-    RowDescriptor* output_row_desc, MemTracker* mem_tracker,
-    RuntimeProfile* profile, RuntimeState* state)
+    const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
+    MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+    bool enable_spilling)
   : state_(state),
     compare_less_than_(compare_less_than),
     in_mem_tuple_sorter_(NULL),
@@ -1351,14 +1351,15 @@ Sorter::Sorter(const TupleRowComparator& compare_less_than,
     sort_tuple_exprs_(sort_tuple_exprs),
     mem_tracker_(mem_tracker),
     output_row_desc_(output_row_desc),
+    enable_spilling_(enable_spilling),
     unsorted_run_(NULL),
     merge_output_run_(NULL),
     profile_(profile),
     initial_runs_counter_(NULL),
     num_merges_counter_(NULL),
     in_mem_sort_timer_(NULL),
-    sorted_data_size_(NULL) {
-}
+    sorted_data_size_(NULL),
+    run_sizes_(NULL) {}
 
 Sorter::~Sorter() {
   DCHECK(sorted_runs_.empty());
@@ -1379,12 +1380,15 @@ Status Sorter::Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) {
   num_merges_counter_ = ADD_COUNTER(profile_, "TotalMergesPerformed", TUnit::UNIT);
   in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
   sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
+  run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT);
 
+  // If spilling is enabled, we need enough buffers to perform merges. Otherwise, there
+  // won't be any merges and we only need 1 buffer.
   // Must be kept in sync with SortNode.computeResourceProfile() in fe.
-  int min_buffers_required = MIN_BUFFERS_PER_MERGE;
-  // Fixed and var-length blocks are separate, so we need MIN_BUFFERS_PER_MERGE
-  // blocks for both if there is var-length data.
-  if (has_var_len_slots_) min_buffers_required *= 2;
+  int min_buffers_required = enable_spilling_ ? MIN_BUFFERS_PER_MERGE : 1;
+  // Fixed and var-length blocks are separate, so we need twice as many blocks for both if
+  // there is var-length data.
+  if (sort_tuple_desc->HasVarlenSlots()) min_buffers_required *= 2;
 
   RETURN_IF_ERROR(block_mgr_->RegisterClient(Substitute("Sorter ptr=$0", this),
       min_buffers_required, false, mem_tracker_, state_, &block_mgr_client_));
@@ -1412,10 +1416,11 @@ void Sorter::FreeLocalAllocations() {
 Status Sorter::AddBatch(RowBatch* batch) {
   DCHECK(unsorted_run_ != NULL);
   DCHECK(batch != NULL);
+  DCHECK(enable_spilling_);
   int num_processed = 0;
   int cur_batch_index = 0;
   while (cur_batch_index < batch->num_rows()) {
-    RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, cur_batch_index, &num_processed));
+    RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
 
     cur_batch_index += num_processed;
     if (cur_batch_index < batch->num_rows()) {
@@ -1430,6 +1435,12 @@ Status Sorter::AddBatch(RowBatch* batch) {
   return Status::OK();
 }
 
+Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) {
+  DCHECK(batch != nullptr);
+  RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed));
+  return Status::OK();
+}
+
 Status Sorter::InputDone() {
   // Sort the tuples in the last run.
   RETURN_IF_ERROR(SortCurrentInputRun());
@@ -1443,6 +1454,7 @@ Status Sorter::InputDone() {
     DCHECK(success) << "Should always be able to prepare pinned run for read.";
     return Status::OK();
   }
+  DCHECK(enable_spilling_);
 
   // Unpin the final run to free up memory for the merge.
   // TODO: we could keep it in memory in some circumstances as an optimisation, once
@@ -1498,6 +1510,7 @@ Status Sorter::SortCurrentInputRun() {
   }
   sorted_runs_.push_back(unsorted_run_);
   sorted_data_size_->Add(unsorted_run_->TotalBytes());
+  run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
   unsorted_run_ = NULL;
 
   RETURN_IF_CANCELLED(state_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index da3c6ef..80c5558 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -38,15 +38,19 @@ class RowBatch;
 /// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are
 /// materialized into a row with a single tuple (the sort tuple) using the materialization
 /// exprs in sort_tuple_exprs_. The sort tuples are sorted according to the sort
-/// parameters and output by the sorter.
-/// AddBatch() can be called multiple times.
+/// parameters and output by the sorter. AddBatch() can be called multiple times.
+//
+/// Callers that don't want to spill can use AddBatchNoSpill() instead, which only adds
+/// rows up to the memory limit and then returns the number of rows that were added.
+/// For this use case, 'enable_spill' should be set to false so that the sorter can reduce
+/// the number of buffers requested from the block mgr since there won't be merges.
 //
 /// InputDone() is called to indicate the end of input. If multiple sorted runs were
 /// created, it triggers intermediate merge steps (if necessary) and creates the final
 /// merger that returns results via GetNext().
 //
 /// GetNext() is used to retrieve sorted rows. It can be called multiple times.
-/// AddBatch(), InputDone() and GetNext() must be called in that order.
+/// AddBatch()/AddBatchNoSpill(), InputDone() and GetNext() must be called in that order.
 //
 /// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks
 /// called a run. The maximum size of a run is determined by the number of blocks that
@@ -92,11 +96,13 @@ class Sorter {
   /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be
   /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns true if
   /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to provide rows
-  /// to the merger and retrieve rows from an intermediate merger.
+  /// to the merger and retrieve rows from an intermediate merger. 'enable_spilling'
+  /// should be set to false to reduce the number of requested buffers if the caller will
+  /// use AddBatchNoSpill().
   Sorter(const TupleRowComparator& compare_less_than,
-      const std::vector<ScalarExpr*>& sort_tuple_exprs,
-      RowDescriptor* output_row_desc, MemTracker* mem_tracker,
-      RuntimeProfile* profile, RuntimeState* state);
+      const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
+      MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
+      bool enable_spilling = true);
 
   ~Sorter();
 
@@ -109,9 +115,16 @@ class Sorter {
   /// the tuples. Must be called after Prepare() or Reset() and before calling AddBatch().
   Status Open() WARN_UNUSED_RESULT;
 
-  /// Adds a batch of input rows to the current unsorted run.
+  /// Adds the entire batch of input rows to the sorter. If the current unsorted run fills
+  /// up, it is sorted and a new unsorted run is created. Cannot be called if
+  /// 'enable_spill' is false.
   Status AddBatch(RowBatch* batch) WARN_UNUSED_RESULT;
 
+  /// Adds input rows to the current unsorted run, starting from 'start_index' up to the
+  /// memory limit. Returns the number of rows added in 'num_processed'.
+  Status AddBatchNoSpill(
+      RowBatch* batch, int start_index, int* num_processed) WARN_UNUSED_RESULT;
+
   /// Called to indicate there is no more input. Triggers the creation of merger(s) if
   /// necessary.
   Status InputDone() WARN_UNUSED_RESULT;
@@ -191,6 +204,9 @@ class Sorter {
   /// sorting. Not owned by the Sorter.
   RowDescriptor* output_row_desc_;
 
+  /// True if this sorter can spill. Used to determine the number of buffers to reserve.
+  bool enable_spilling_;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
@@ -242,6 +258,9 @@ class Sorter {
 
   /// Total size of the initial runs in bytes.
   RuntimeProfile::Counter* sorted_data_size_;
+
+  /// Min, max, and avg size of runs in number of tuples.
+  RuntimeProfile::SummaryStatsCounter* run_sizes_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index b37235f..40f72c6 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -48,6 +48,8 @@ namespace impala {
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
+  #define ADD_SUMMARY_STATS_COUNTER(profile, name, unit) \
+      (profile)->AddSummaryStatsCounter(name, unit)
   #define ADD_CHILD_TIMER(profile, name, parent) \
       (profile)->AddCounter(name, TUnit::TIME_NS, parent)
   #define SCOPED_TIMER(c) \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index e5e7f24..c1ff302 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -346,11 +346,22 @@ struct TSortInfo {
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
 }
 
+enum TSortType {
+  // Sort the entire input.
+  TOTAL,
+
+  // Return the first N sorted elements.
+  TOPN,
+
+  // Divide the input into batches, each of which is sorted individually.
+  PARTIAL
+}
+
 struct TSortNode {
   1: required TSortInfo sort_info
-  // Indicates whether the backend service should use topn vs. sorting
-  2: required bool use_top_n;
-  // This is the number of rows to skip before returning results
+  2: required TSortType type
+  // This is the number of rows to skip before returning results.
+  // Not used with TSortType::PARTIAL.
   3: optional i64 offset
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 08dd9f5..41ff9d2 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -356,7 +356,8 @@ public class AnalyticPlanner {
       }
 
       SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
-      SortNode sortNode = new SortNode(ctx_.getNextNodeId(), root, sortInfo, false, 0);
+      SortNode sortNode =
+          SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, 0);
 
       // if this sort group does not have partitioning exprs, we want the sort
       // to be executed like a regular distributed sort

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index c65c668..c202094 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -547,11 +547,13 @@ public class Planner {
        Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = Lists.newArrayList();
 
+    boolean partialSort = false;
     if (insertStmt.getTargetTable() instanceof KuduTable) {
       if (!insertStmt.hasNoClusteredHint() && !ctx_.isSingleNodeExec()) {
         orderingExprs.add(
             KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
         orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
+        partialSort = true;
       }
     } else if (insertStmt.hasClusteredHint() || !insertStmt.getSortExprs().isEmpty()) {
       // NOTE: If the table has a 'sort.columns' property and the query has a
@@ -576,10 +578,16 @@ public class Planner {
 
     insertStmt.substituteResultExprs(smap, analyzer);
 
-    SortNode sortNode = new SortNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot(),
-        sortInfo, false, 0);
-    sortNode.init(analyzer);
+    PlanNode node = null;
+    if (partialSort) {
+      node = SortNode.createPartialSortNode(
+          ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo);
+    } else {
+      node = SortNode.createTotalSortNode(
+          ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);
+    }
+    node.init(analyzer);
 
-    inputFragment.setPlanRoot(sortNode);
+    inputFragment.setPlanRoot(node);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 3e0692b..8d82409 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -295,8 +295,13 @@ public class SingleNodePlanner {
       // TODO: External sort could be used for very large limits
       // not just unlimited order-by
       boolean useTopN = stmt.hasLimit() && !disableTopN;
-      root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
-          useTopN, stmt.getOffset());
+      if (useTopN) {
+        root = SortNode.createTopNSortNode(
+            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
+      } else {
+        root = SortNode.createTotalSortNode(
+            ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset());
+      }
       Preconditions.checkState(root.hasValidStats());
       root.setLimit(limit);
       root.init(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index f628885..aee8fda 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -36,20 +36,27 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortInfo;
 import org.apache.impala.thrift.TSortNode;
+import org.apache.impala.thrift.TSortType;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
- * Node that implements a sort with or without a limit. useTopN_ is true for sorts
- * with limits that are implemented by a TopNNode in the backend. SortNode is used
- * otherwise.
+ * Node the implements various types of sorts:
+ * - TOTAL: uses SortNode in the BE.
+ * - TOPN: uses TopNNode in the BE. Must have a limit.
+ * - PARTIAL: use PartialSortNode in the BE. Cannot have a limit or offset.
+ *
  * Will always materialize the new tuple info_.sortTupleDesc_.
  */
 public class SortNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(SortNode.class);
 
+  // Memory limit for partial sorts, specified in bytes. TODO: determine the value for
+  // this, consider making it configurable, enforce it in the BE. (IMPALA-5669)
+  private final long PARTIAL_SORT_MEM_LIMIT = 128 * 1024 * 1024;
+
   private final SortInfo info_;
 
   // if set, this SortNode requires its input to have this data partition
@@ -61,24 +68,50 @@ public class SortNode extends PlanNode {
   // info_.sortTupleSlotExprs_ substituted with the outputSmap_ for materialized slots
   // in init().
   private List<Expr> resolvedTupleExprs_;
-  private final boolean useTopN_;
+
   // The offset of the first row to return.
   protected long offset_;
 
-  public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN,
-      long offset) {
-    super(id, info.getSortTupleDescriptor().getId().asList(),
-        getDisplayName(useTopN, false));
+  // The type of sort. Determines the exec node used in the BE.
+  private TSortType type_;
+
+  /**
+   * Creates a new SortNode that implements a partial sort.
+   */
+  public static SortNode createPartialSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info) {
+    return new SortNode(id, input, info, 0, TSortType.PARTIAL);
+  }
+
+  /**
+   * Creates a new SortNode with a limit that is executed with TopNNode in the BE.
+   */
+  public static SortNode createTopNSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset) {
+    return new SortNode(id, input, info, offset, TSortType.TOPN);
+  }
+
+  /**
+   * Creates a new SortNode that does a total sort, possibly with a limit.
+   */
+  public static SortNode createTotalSortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset) {
+    return new SortNode(id, input, info, offset, TSortType.TOTAL);
+  }
+
+  private SortNode(
+      PlanNodeId id, PlanNode input, SortInfo info, long offset, TSortType type) {
+    super(id, info.getSortTupleDescriptor().getId().asList(), getDisplayName(type));
     info_ = info;
-    useTopN_ = useTopN;
     children_.add(input);
     offset_ = offset;
+    type_ = type;
   }
 
   public long getOffset() { return offset_; }
   public void setOffset(long offset) { offset_ = offset; }
   public boolean hasOffset() { return offset_ > 0; }
-  public boolean useTopN() { return useTopN_; }
+  public boolean useTopN() { return type_ == TSortType.TOPN; }
   public SortInfo getSortInfo() { return info_; }
   public void setInputPartition(DataPartition inputPartition) {
     inputPartition_ = inputPartition;
@@ -88,7 +121,7 @@ public class SortNode extends PlanNode {
   public void setIsAnalyticSort(boolean v) { isAnalyticSort_ = v; }
 
   @Override
-  public boolean isBlockingNode() { return true; }
+  public boolean isBlockingNode() { return type_ != TSortType.PARTIAL; }
 
   @Override
   public void init(Analyzer analyzer) throws InternalException {
@@ -146,6 +179,7 @@ public class SortNode extends PlanNode {
       strings.add(isAsc ? "a" : "d");
     }
     return Objects.toStringHelper(this)
+        .add("type_", type_)
         .add("ordering_exprs", Expr.debugString(info_.getOrderingExprs()))
         .add("is_asc", "[" + Joiner.on(" ").join(strings) + "]")
         .add("nulls_first", "[" + Joiner.on(" ").join(info_.getNullsFirst()) + "]")
@@ -162,7 +196,7 @@ public class SortNode extends PlanNode {
     Preconditions.checkState(tupleIds_.size() == 1,
         "Incorrect size for tupleIds_ in SortNode");
     sort_info.setSort_tuple_slot_exprs(Expr.treesToThrift(resolvedTupleExprs_));
-    TSortNode sort_node = new TSortNode(sort_info, useTopN_);
+    TSortNode sort_node = new TSortNode(sort_info, type_);
     sort_node.setOffset(offset_);
     msg.sort_node = sort_node;
   }
@@ -218,7 +252,7 @@ public class SortNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkState(hasValidStats());
-    if (useTopN_) {
+    if (type_ == TSortType.TOPN) {
       long perInstanceMemEstimate =
               (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
       nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
@@ -245,22 +279,39 @@ public class SortNode extends PlanNode {
     // blocks on disk and reads from both sequences when merging. This effectively
     // doubles the block size when there are var-len columns present.
     if (hasVarLenSlots) blockSize *= 2;
-    double numInputBlocks = Math.ceil(fullInputSize / blockSize);
-    long perInstanceMemEstimate = blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
 
-    // Must be kept in sync with min_buffers_required in Sorter in be.
-    long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
-    if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
-      perInstanceMinReservation *= 2;
+    if (type_ == TSortType.PARTIAL) {
+      // The memory limit cannot be less than the size of the required blocks.
+      long mem_limit =
+          PARTIAL_SORT_MEM_LIMIT > blockSize ? PARTIAL_SORT_MEM_LIMIT : blockSize;
+      // 'fullInputSize' will be negative if stats are missing, just use the limit.
+      long perInstanceMemEstimate = fullInputSize < 0 ?
+          mem_limit :
+          Math.min((long) Math.ceil(fullInputSize), mem_limit);
+      nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, blockSize);
+    } else {
+      Preconditions.checkState(type_ == TSortType.TOTAL);
+      double numInputBlocks = Math.ceil(fullInputSize / blockSize);
+      long perInstanceMemEstimate =
+          blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+
+      // Must be kept in sync with min_buffers_required in Sorter in be.
+      long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
+      if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
+        perInstanceMinReservation *= 2;
+      }
+      nodeResourceProfile_ =
+          new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
     }
-    nodeResourceProfile_ =
-        new ResourceProfile(perInstanceMemEstimate, perInstanceMinReservation);
   }
 
-  private static String getDisplayName(boolean isTopN, boolean isMergeOnly) {
-    if (isTopN) {
+  private static String getDisplayName(TSortType type) {
+    if (type == TSortType.TOPN) {
       return "TOP-N";
+    } else if (type == TSortType.PARTIAL) {
+      return "PARTIAL SORT";
     } else {
+      Preconditions.checkState(type == TSortType.TOTAL);
       return "SORT";
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index 2bc5df7..c538e57 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -10,7 +10,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(bigint_col))]
@@ -51,7 +51,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-08:SORT
+08:PARTIAL SORT
 |  order by: KuduPartition(a.bigint_col) ASC NULLS LAST, bigint_col ASC NULLS LAST
 |
 07:EXCHANGE [KUDU(KuduPartition(a.bigint_col))]
@@ -97,7 +97,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -125,7 +125,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(functional_kudu.testtbl.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(functional_kudu.testtbl.id))]
@@ -148,7 +148,7 @@ UPSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
-04:SORT
+04:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 03:EXCHANGE [KUDU(KuduPartition(id))]
@@ -175,7 +175,7 @@ UPSERT INTO KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -195,7 +195,7 @@ upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
@@ -216,7 +216,7 @@ upsert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 UPSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 16cb3a9..436aa51 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -50,7 +50,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(10) ASC NULLS LAST, 10 ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(10))]
@@ -66,7 +66,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(int_col) ASC NULLS LAST, int_col ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(int_col))]
@@ -90,7 +90,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-06:SORT
+06:PARTIAL SORT
 |  order by: KuduPartition(count(id)) ASC NULLS LAST, count(id) ASC NULLS LAST
 |
 05:EXCHANGE [KUDU(KuduPartition(count(id)))]
@@ -264,7 +264,7 @@ INSERT INTO KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-02:SORT
+02:PARTIAL SORT
 |  order by: KuduPartition(functional_kudu.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 01:EXCHANGE [KUDU(KuduPartition(functional_kudu.alltypes.id))]
@@ -288,7 +288,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
-05:SORT
+05:PARTIAL SORT
 |  order by: KuduPartition(id) ASC NULLS LAST, id ASC NULLS LAST
 |
 04:EXCHANGE [KUDU(KuduPartition(id))]
@@ -394,7 +394,7 @@ insert into functional_kudu.alltypes /* +noshuffle */ select * from functional.a
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
-01:SORT
+01:PARTIAL SORT
 |  order by: KuduPartition(functional.alltypes.id) ASC NULLS LAST, id ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ad0c6e74/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
index 71b09fc..76ad779 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test
@@ -435,4 +435,6 @@ NumRowErrors: 1
 set mem_limit=400m;
 create table kudu_test primary key(a, b) partition by hash(a, b) partitions 8 stored as kudu as
 select l_orderkey a, concat(l_comment, l_comment, l_comment) b from tpch.lineitem
-====
\ No newline at end of file
+---- RUNTIME_PROFILE
+row_regex: .*SpilledRuns: 0 \(0\)
+====