You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/30 06:18:15 UTC

[1/7] impala git commit: IMPALA-7421. Static methods use wrong JNI call function

Repository: impala
Updated Branches:
  refs/heads/master c6ce735d1 -> 5c541b960


IMPALA-7421. Static methods use wrong JNI call function

Various places throughout the code were using the JNI CallObjectMethod
function when in fact they should have been using
CallStaticObjectMethod. It appears that this doesn't cause crashes in
release builds, but if running with -Xcheck:jni, it causes an assertion
failure:

   guarantee(method->size_of_parameters() == size_of_parameters())
      failed: wrong no. of arguments pushed

This patch cleans up JniUtil a bit to add a "fluent" style utility for
calling JNI functions. This calling style should make the above mistake
more obvious because it forces the caller to write 'on_class(...)' for
static methods and 'on_instance(...)' for instance methods.

Change-Id: If7cde6ca91613b63afe5307f4d819fb24cb17fd6
Reviewed-on: http://gerrit.cloudera.org:8080/11181
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/54742300
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/54742300
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/54742300

Branch: refs/heads/master
Commit: 547423007b82c8181be277842f092bfd86f5b720
Parents: c6ce735
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Aug 9 21:48:53 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 28 23:40:52 2018 +0000

----------------------------------------------------------------------
 be/src/common/init.cc            |   1 +
 be/src/common/status.h           |   2 +-
 be/src/service/fe-support.h      |   2 -
 be/src/service/frontend.cc       |  20 +--
 be/src/util/backend-gflag-util.h |   3 +-
 be/src/util/jni-util.cc          |  18 ++-
 be/src/util/jni-util.h           | 255 ++++++++++++++++++++++++++--------
 be/src/util/logging-support.cc   |  14 +-
 be/src/util/logging-support.h    |   1 -
 be/src/util/zip-util.cc          |   3 +-
 be/src/util/zip-util.h           |   3 +-
 11 files changed, 223 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index bc1065b..ed61fe4 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -41,6 +41,7 @@
 #include "util/debug-util.h"
 #include "util/decimal-util.h"
 #include "util/disk-info.h"
+#include "util/jni-util.h"
 #include "util/logging-support.h"
 #include "util/mem-info.h"
 #include "util/memory-metrics.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index b211376..3c0a7e5 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -105,7 +105,7 @@ class NODISCARD Status {
 
   /// Move constructor that moves the error message (if any) and resets 'other' to the
   /// default OK Status.
-  ALWAYS_INLINE Status(Status&& other) : msg_(other.msg_) { other.msg_ = NULL; }
+  ALWAYS_INLINE Status(Status&& other) noexcept : msg_(other.msg_) { other.msg_ = NULL; }
 
   /// Status using only the error code as a parameter. This can be used for error messages
   /// that don't take format parameters.

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/service/fe-support.h
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.h b/be/src/service/fe-support.h
index 11ad18b..2e17196 100644
--- a/be/src/service/fe-support.h
+++ b/be/src/service/fe-support.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_SERVICE_FE_SUPPORT_H
 #define IMPALA_SERVICE_FE_SUPPORT_H
 
-#include "util/jni-util.h"
-
 namespace impala {
 
 /// InitFeSupport registers native functions with JNI. When the java

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index aec92f7..af45d96 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -224,23 +224,9 @@ Status Frontend::GetExplainPlan(
 Status Frontend::ValidateSettings() {
   // Use FE to check Hadoop config setting
   // TODO: check OS setting
-  stringstream ss;
-  JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jstring error_string =
-      static_cast<jstring>(jni_env->CallObjectMethod(fe_, check_config_id_));
-  RETURN_ERROR_IF_EXC(jni_env);
-  jboolean is_copy;
-  const char *str = jni_env->GetStringUTFChars(error_string, &is_copy);
-  RETURN_ERROR_IF_EXC(jni_env);
-  ss << str;
-  jni_env->ReleaseStringUTFChars(error_string, str);
-  RETURN_ERROR_IF_EXC(jni_env);
-
-  if (ss.str().size() > 0) {
-    return Status(ss.str());
-  }
+  string err;
+  RETURN_IF_ERROR(JniCall::instance_method(fe_, check_config_id_).Call(&err));
+  if (!err.empty()) return Status(err);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/backend-gflag-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.h b/be/src/util/backend-gflag-util.h
index 796a2d5..d68ed2f 100644
--- a/be/src/util/backend-gflag-util.h
+++ b/be/src/util/backend-gflag-util.h
@@ -18,8 +18,9 @@
 #ifndef UTIL_BACKEND_CONFIG_H_
 #define UTIL_BACKEND_CONFIG_H_
 
+#include <jni.h>
+
 #include "common/status.h"
-#include "util/jni-util.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/jni-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 19e4e10..3e5beac 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -220,13 +220,9 @@ Status JniUtil::InitJvmPauseMonitor() {
   JniMethodDescriptor init_jvm_pm_desc = {
       "initPauseMonitor", "(J)V", &init_jvm_pm_method};
   RETURN_IF_ERROR(JniUtil::LoadStaticJniMethod(env, jni_util_cl_, &init_jvm_pm_desc));
-  JNIEnv* jni_env = getJNIEnv();
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(jni_env));
-  jni_env->CallObjectMethod(
-      jni_util_cl_, init_jvm_pm_method, FLAGS_jvm_deadlock_detector_interval_s);
-  RETURN_ERROR_IF_EXC(jni_env);
-  return Status::OK();
+  return JniCall::static_method(jni_util_cl_, init_jvm_pm_method)
+      .with_primitive_arg(FLAGS_jvm_deadlock_detector_interval_s)
+      .Call();
 }
 
 Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& prefix) {
@@ -266,16 +262,18 @@ Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& pr
 
 Status JniUtil::GetJvmMemoryMetrics(const TGetJvmMemoryMetricsRequest& request,
     TGetJvmMemoryMetricsResponse* result) {
-  return JniUtil::CallJniMethod(jni_util_class(), get_jvm_metrics_id_, request, result);
+  return JniCall::static_method(jni_util_class(), get_jvm_metrics_id_)
+      .with_thrift_arg(request).Call(result);
 }
 
 Status JniUtil::GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request,
     TGetJvmThreadsInfoResponse* result) {
-  return JniUtil::CallJniMethod(jni_util_class(), get_jvm_threads_id_, request, result);
+  return JniCall::static_method(jni_util_class(), get_jvm_threads_id_)
+      .with_thrift_arg(request).Call(result);
 }
 
 Status JniUtil::GetJMXJson(TGetJMXJsonResponse* result) {
-  return JniUtil::CallJniMethod(jni_util_class(), get_jmx_json_, result);
+  return JniCall::static_method(jni_util_class(), get_jmx_json_).Call(result);
 }
 
 Status JniUtil::LoadJniMethod(JNIEnv* env, const jclass& jni_class,

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index fd87839..c4030a4 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -25,6 +25,7 @@
 
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
+#include "gutil/macros.h"
 
 #define THROW_IF_ERROR_WITH_LOGGING(stmt, env, adaptor) \
   do { \
@@ -120,6 +121,11 @@ class JniLocalFrame {
   JniLocalFrame(): env_(NULL) {}
   ~JniLocalFrame() { if (env_ != NULL) env_->PopLocalFrame(NULL); }
 
+  JniLocalFrame(JniLocalFrame&& other) noexcept
+    : env_(other.env_) {
+    other.env_ = nullptr;
+  }
+
   /// Pushes a new JNI local frame. The frame can support max_local_ref local references.
   /// The number of local references created inside the frame might exceed max_local_ref,
   /// but there is no guarantee that memory will be available.
@@ -127,6 +133,8 @@ class JniLocalFrame {
   Status push(JNIEnv* env, int max_local_ref = 10) WARN_UNUSED_RESULT;
 
  private:
+  DISALLOW_COPY_AND_ASSIGN(JniLocalFrame);
+
   JNIEnv* env_;
 };
 
@@ -197,6 +205,86 @@ class JniScopedArrayCritical {
   DISALLOW_COPY_AND_ASSIGN(JniScopedArrayCritical);
 };
 
+/// Utility class for making JNI calls, with various types of argument
+/// or response.
+///
+/// Example usages:
+///
+/// 1) Static call taking a Thrift struct and returning a string:
+///
+///   string s;
+///   RETURN_IF_ERROR(JniCall::static_method(my_jclass, my_method)
+///       .with_thrift_arg(foo).Call(&s));
+///
+/// 2) Non-static call taking no arguments and returning a Thrift struct:
+///
+///   TMyObject result;
+///   RETURN_IF_ERROR(JniCall::instance_method(my_jobject, my_method).Call(&result);
+class JniCall {
+ public:
+   JniCall(JniCall&& other) noexcept = default;
+
+   static JniCall static_method(jclass clazz, jmethodID method) WARN_UNUSED_RESULT {
+     return JniCall(method, clazz);
+   }
+
+   static JniCall instance_method(jobject obj, jmethodID method) WARN_UNUSED_RESULT {
+     return JniCall(method, obj);
+   }
+
+  /// Pass a Thrift-encoded argument. The JNI method should take a byte[] for the
+  /// Thrift-serialized data. Multiple arguments may be passed by repeated calls.
+  template<class T>
+  JniCall& with_thrift_arg(const T& arg) WARN_UNUSED_RESULT;
+
+  /// Pass a primitive arg (eg an integer).
+  /// Multiple arguments may be passed by repeated calls.
+  template<class T>
+  JniCall& with_primitive_arg(T arg) WARN_UNUSED_RESULT;
+
+  /// Call the method expecting no result.
+  Status Call() WARN_UNUSED_RESULT {
+    return Call(static_cast<void*>(nullptr));
+  }
+
+  /// Call the method and return a result (either std::string or a Thrift struct).
+  template<class T>
+  Status Call(T* result) WARN_UNUSED_RESULT;
+
+ private:
+  explicit JniCall(jmethodID method)
+    : method_(method),
+      env_(getJNIEnv()) {
+    status_ = frame_.push(env_);
+  }
+
+  explicit JniCall(jmethodID method, jclass cls) : JniCall(method) {
+    class_ = DCHECK_NOTNULL(cls);
+  }
+
+  explicit JniCall(jmethodID method, jobject instance) : JniCall(method) {
+    instance_ = DCHECK_NOTNULL(instance);
+  }
+
+  template<class T>
+  Status ObjectToResult(jobject obj, T* result) WARN_UNUSED_RESULT;
+
+  Status ObjectToResult(jobject obj, void* no_result) WARN_UNUSED_RESULT;
+
+  Status ObjectToResult(jobject obj, std::string* result) WARN_UNUSED_RESULT;
+
+  const jmethodID method_;
+  JNIEnv* const env_;
+  JniLocalFrame frame_;
+
+  jclass class_ = nullptr;
+  jobject instance_ = nullptr;
+  std::vector<jvalue> args_;
+  Status status_;
+
+  DISALLOW_COPY_AND_ASSIGN(JniCall);
+};
+
 
 /// Utility class for JNI-related functionality.
 /// Init() should be called as soon as the native library is loaded.
@@ -306,74 +394,28 @@ class JniUtil {
       JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT;
 
   /// Utility methods to avoid repeating lots of the JNI call boilerplate.
+  /// New code should prefer using JniCall() directly for better clarity.
   static Status CallJniMethod(
       const jobject& obj, const jmethodID& method) WARN_UNUSED_RESULT {
-    JNIEnv* jni_env = getJNIEnv();
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    jni_env->CallObjectMethod(obj, method);
-    RETURN_ERROR_IF_EXC(jni_env);
-    return Status::OK();
+    return JniCall::instance_method(obj, method).Call();
   }
 
-  template <typename T>
-  static Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg) {
-    JNIEnv* jni_env = getJNIEnv();
-    jbyteArray request_bytes;
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
-    jni_env->CallObjectMethod(obj, method, request_bytes);
-    RETURN_ERROR_IF_EXC(jni_env);
-    return Status::OK();
+  static Status CallStaticJniMethod(
+      const jclass& cls, const jmethodID& method) WARN_UNUSED_RESULT {
+    return JniCall::static_method(cls, method).Call();
   }
 
-  template <typename R>
-  static Status CallJniMethod(const jobject& obj, const jmethodID& method, R* response) {
-    JNIEnv* jni_env = getJNIEnv();
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    jbyteArray result_bytes = static_cast<jbyteArray>(
-        jni_env->CallObjectMethod(obj, method));
-    RETURN_ERROR_IF_EXC(jni_env);
-    RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
-    return Status::OK();
-  }
+  template <typename T>
+  static Status CallJniMethod(const jobject& obj, const jmethodID& method,
+      const T& arg) WARN_UNUSED_RESULT;
 
   template <typename T, typename R>
   static Status CallJniMethod(const jobject& obj, const jmethodID& method,
-      const T& arg, R* response) {
-    JNIEnv* jni_env = getJNIEnv();
-    jbyteArray request_bytes;
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
-    jbyteArray result_bytes = static_cast<jbyteArray>(
-        jni_env->CallObjectMethod(obj, method, request_bytes));
-    RETURN_ERROR_IF_EXC(jni_env);
-    RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
-    return Status::OK();
-  }
+      const T& arg, R* response) WARN_UNUSED_RESULT;
 
-  template <typename T>
+  template <typename R>
   static Status CallJniMethod(const jobject& obj, const jmethodID& method,
-      const T& arg, std::string* response) {
-    JNIEnv* jni_env = getJNIEnv();
-    jbyteArray request_bytes;
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(jni_env));
-    RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
-    jstring java_response_string = static_cast<jstring>(
-        jni_env->CallObjectMethod(obj, method, request_bytes));
-    RETURN_ERROR_IF_EXC(jni_env);
-    jboolean is_copy;
-    const char *str = jni_env->GetStringUTFChars(java_response_string, &is_copy);
-    RETURN_ERROR_IF_EXC(jni_env);
-    *response = str;
-    jni_env->ReleaseStringUTFChars(java_response_string, str);
-    RETURN_ERROR_IF_EXC(jni_env);
-    return Status::OK();
-  }
+      R* response) WARN_UNUSED_RESULT;
 
  private:
   // Set in Init() once the JVM is initialized.
@@ -387,6 +429,105 @@ class JniUtil {
   static jmethodID get_jmx_json_;
 };
 
+/// Convert a C++ primitive to a JNI 'jvalue' union.
+/// See https://docs.oracle.com/javase/7/docs/technotes/guides/jni/spec/types.html
+/// for reference on the union members.
+template<typename T>
+jvalue PrimitiveToValue(T cpp_val);
+
+#define SPECIALIZE_PRIMITIVE_TO_VALUE(cpp_type, union_field)    \
+  template<> inline jvalue PrimitiveToValue(cpp_type cpp_val) { \
+    jvalue v;                                                   \
+    memset(&v, 0, sizeof(v));                                    \
+    v.union_field = cpp_val;                                    \
+    return v;                                                   \
+  }
+SPECIALIZE_PRIMITIVE_TO_VALUE(bool, z);
+SPECIALIZE_PRIMITIVE_TO_VALUE(int8_t, b);
+SPECIALIZE_PRIMITIVE_TO_VALUE(char, c);
+SPECIALIZE_PRIMITIVE_TO_VALUE(int16_t, s);
+SPECIALIZE_PRIMITIVE_TO_VALUE(int32_t, i);
+SPECIALIZE_PRIMITIVE_TO_VALUE(int64_t, j);
+SPECIALIZE_PRIMITIVE_TO_VALUE(float, f);
+SPECIALIZE_PRIMITIVE_TO_VALUE(double, d);
+#undef SPECIALIZE_PRIMITIVE_TO_VALUE
+
+template <typename T>
+inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method,
+    const T& arg) {
+  return JniCall::instance_method(obj, method).with_thrift_arg(arg).Call();
+}
+
+template <typename T, typename R>
+inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method,
+    const T& arg, R* response) {
+  return JniCall::instance_method(obj, method).with_thrift_arg(arg).Call(response);
+}
+
+template <typename R>
+inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method,
+    R* response) {
+  return JniCall::instance_method(obj, method).Call(response);
+}
+
+template<class T>
+inline JniCall& JniCall::with_thrift_arg(const T& arg) {
+  if (!status_.ok()) return *this;
+  jbyteArray bytes;
+  status_ = SerializeThriftMsg(env_, &arg, &bytes);
+  if (status_.ok()) {
+    jvalue arg;
+    memset(&arg, 0, sizeof(arg));
+    arg.l = bytes;
+    args_.emplace_back(arg);
+  }
+  return *this;
+}
+template<class T>
+inline JniCall& JniCall::with_primitive_arg(T arg) {
+  if (!status_.ok()) return *this;
+  args_.emplace_back(PrimitiveToValue(arg));
+  return *this;
+}
+
+template<class T>
+inline Status JniCall::Call(T* result) {
+  RETURN_IF_ERROR(status_);
+  DCHECK((instance_ != nullptr) ^ (class_ != nullptr));
+
+  // Even if the function takes no arguments, it's OK to pass an array here.
+  // The JNI API doesn't take a length and just assumes that you've passed
+  // an appropriate number of elements.
+  jobject ret;
+  if (class_) {
+    ret = env_->CallStaticObjectMethodA(class_, method_, args_.data());
+  } else {
+    ret = env_->CallObjectMethodA(instance_, method_, args_.data());
+  }
+  RETURN_ERROR_IF_EXC(env_);
+  RETURN_IF_ERROR(ObjectToResult(ret, result));
+  return Status::OK();
+}
+
+template<class T>
+inline Status JniCall::ObjectToResult(jobject obj, T* result) {
+  DCHECK(obj) << "Call returned unexpected null Thrift object";
+  RETURN_IF_ERROR(DeserializeThriftMsg(env_, static_cast<jbyteArray>(obj), result));
+  return Status::OK();
+}
+
+inline Status JniCall::ObjectToResult(jobject obj, void* no_result) {
+  return Status::OK();
+}
+
+inline Status JniCall::ObjectToResult(jobject obj, std::string* result) {
+  DCHECK(obj) << "Call returned unexpected null String instance";
+  JniUtfCharGuard utf;
+  RETURN_IF_ERROR(JniUtfCharGuard::create(env_, static_cast<jstring>(obj), &utf));
+  *result = utf.get();;
+  return Status::OK();
+}
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/logging-support.cc
----------------------------------------------------------------------
diff --git a/be/src/util/logging-support.cc b/be/src/util/logging-support.cc
index c037eb7..457c8f9 100644
--- a/be/src/util/logging-support.cc
+++ b/be/src/util/logging-support.cc
@@ -24,6 +24,7 @@
 
 #include "common/logging.h"
 #include "rpc/jni-thrift-util.h"
+#include "util/jni-util.h"
 #include "util/webserver.h"
 
 #include "common/names.h"
@@ -134,15 +135,12 @@ void InitDynamicLoggingSupport() {
 // Helper method to get the log level of given Java class. It is a JNI wrapper around
 // GlogAppender.getLogLevel().
 Status GetJavaLogLevel(const TGetJavaLogLevelParams& params, string* result) {
-  RETURN_IF_ERROR(
-      JniUtil::CallJniMethod(log4j_logger_class_, get_log_level_method, params, result));
-  return Status::OK();
+  return JniCall::static_method(log4j_logger_class_, get_log_level_method)
+      .with_thrift_arg(params).Call(result);
 }
 
 Status ResetJavaLogLevels() {
-  RETURN_IF_ERROR(
-      JniUtil::CallJniMethod(log4j_logger_class_, reset_log_levels_method));
-  return Status::OK();
+  return JniCall::static_method(log4j_logger_class_, reset_log_levels_method).Call();
 }
 
 // Callback handler for /get_java_loglevel.
@@ -181,8 +179,8 @@ void SetJavaLogLevelCallback(const Webserver::ArgumentMap& args, Document* docum
   string result;
   params.__set_class_name(classname->second);
   params.__set_log_level(level->second);
-  Status status =
-      JniUtil::CallJniMethod(log4j_logger_class_, set_log_level_method, params, &result);
+  Status status = JniCall::static_method(log4j_logger_class_, set_log_level_method)
+    .with_thrift_arg(params).Call(&result);
   if (!status.ok()) {
     AddDocumentMember(status.GetDetail(), "error", document);
     return;

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/logging-support.h
----------------------------------------------------------------------
diff --git a/be/src/util/logging-support.h b/be/src/util/logging-support.h
index 87579af..2e7b73c 100644
--- a/be/src/util/logging-support.h
+++ b/be/src/util/logging-support.h
@@ -18,7 +18,6 @@
 #ifndef IMPALA_UTIL_LOGGING_SUPPORT_H
 #define IMPALA_UTIL_LOGGING_SUPPORT_H
 
-#include "util/jni-util.h"
 #include "gen-cpp/Logging_types.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/zip-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/zip-util.cc b/be/src/util/zip-util.cc
index b8777fa..f6e1f15 100644
--- a/be/src/util/zip-util.cc
+++ b/be/src/util/zip-util.cc
@@ -42,7 +42,8 @@ Status ZipUtil::ExtractFiles(const string& archive_file, const string& destinati
   TExtractFromZipParams params;
   params.archive_file = archive_file;
   params.destination_dir = destination_dir;
-  return JniUtil::CallJniMethod(zip_util_class_, extract_files_method, params);
+  return JniCall::static_method(zip_util_class_, extract_files_method)
+      .with_thrift_arg(params).Call();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/54742300/be/src/util/zip-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/zip-util.h b/be/src/util/zip-util.h
index 3674ba9..959753b 100644
--- a/be/src/util/zip-util.h
+++ b/be/src/util/zip-util.h
@@ -20,7 +20,8 @@
 
 #include <string>
 
-#include "util/jni-util.h"
+#include <jni.h>
+
 #include "common/status.h"
 
 namespace impala {


[7/7] impala git commit: Add missing authorization in KRPC

Posted by kw...@apache.org.
Add missing authorization in KRPC

In 2.12.0, Impala adopted Kudu RPC library for certain backened services
(TransmitData(), EndDataStream()). While the implementation uses Kerberos
for authenticating users connecting to the backend services, there is no
authorization implemented. This is a regression from the Thrift based
implementation because it registered a SASL callback (SaslAuthorizeInternal)
to be invoked during the connection negotiation. With this regression,
an unauthorized but authenticated user may invoke RPC calls to Impala backend
services.

This change fixes the issue above by overriding the default authorization method
for the DataStreamService. The authorization method will only let authenticated
principal which matches FLAGS_principal / FLAGS_be_principal to access the service.
Also added a new startup flag --krb5_ccname to allow users to customize the locations
of the Kerberos credentials cache.

Testing done:
1. Added a new test case in rpc-mgr-kerberized-test.cc to confirm an unauthorized
user is not allowed to access the service.
2. Ran some queries in a Kerberos enabled cluster to make sure there is no error.
3. Exhaustive builds.

Thanks to Todd Lipcon for pointing out the problem and his guidance on the fix.

Change-Id: I2f82dee5e721f2ed23e75fd91abbc6ab7addd4c5
Reviewed-on: http://gerrit.cloudera.org:8080/11331
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5c541b96
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5c541b96
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5c541b96

Branch: refs/heads/master
Commit: 5c541b960491ba91533712144599fb3b6d99521d
Parents: b97e5ba
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Aug 23 00:33:16 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 30 04:06:09 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc             |   2 +
 be/src/rpc/authentication.cc              |  16 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc     | 127 +++++++++--
 be/src/rpc/rpc-mgr-test-base.h            | 284 -----------------------
 be/src/rpc/rpc-mgr-test.cc                |  39 ++--
 be/src/rpc/rpc-mgr-test.h                 | 300 +++++++++++++++++++++++++
 be/src/rpc/rpc-mgr.cc                     |  33 ++-
 be/src/rpc/rpc-mgr.h                      |  10 +
 be/src/rpc/thrift-server-test.cc          |  19 +-
 be/src/runtime/data-stream-test.cc        |   5 +
 be/src/service/data-stream-service.cc     |   6 +
 be/src/service/data-stream-service.h      |   6 +
 be/src/testutil/mini-kdc-wrapper.cc       |  32 ++-
 be/src/testutil/mini-kdc-wrapper.h        |  44 ++--
 be/src/util/auth-util.cc                  |  37 ++-
 be/src/util/auth-util.h                   |   6 +-
 bin/rat_exclude_files.txt                 |   1 +
 common/protobuf/data_stream_service.proto |   5 +
 common/protobuf/kudu                      |   1 +
 common/protobuf/rpc_test.proto            |   6 +
 20 files changed, 588 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 5c5dc03..1267072 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -47,6 +47,8 @@ DEFINE_string(principal, "", "Kerberos principal. If set, both client and backen
 DEFINE_string(be_principal, "", "Kerberos principal for backend network connections only,"
     "overriding --principal if set. Must not be set if --principal is not set.");
 DEFINE_string(keytab_file, "", "Absolute path to Kerberos keytab file");
+DEFINE_string(krb5_ccname, "/tmp/krb5cc_impala_internal", "Absolute path to the file "
+    "based credentials cache that we pass to the KRB5CCNAME environment variable.");
 DEFINE_string(krb5_conf, "", "Absolute path to Kerberos krb5.conf if in a non-standard "
     "location. Does not normally need to be set.");
 DEFINE_string(krb5_debug_file, "", "Turn on Kerberos debugging and output to this file");

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index c22ab88..0b5b5d9 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -62,12 +62,13 @@ using boost::algorithm::trim;
 using boost::mt19937;
 using boost::uniform_int;
 using namespace apache::thrift;
-using namespace boost::filesystem;   // for is_regular()
+using namespace boost::filesystem;   // for is_regular(), is_absolute()
 using namespace strings;
 
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
 DECLARE_string(be_principal);
+DECLARE_string(krb5_ccname);
 DECLARE_string(krb5_conf);
 DECLARE_string(krb5_debug_file);
 
@@ -123,10 +124,6 @@ static vector<sasl_callback_t> LDAP_EXT_CALLBACKS;  // External LDAP connections
 // the same 'appname' or InitAuth() will fail.
 static string APP_NAME;
 
-// Path to the file based credential cache that we pass to the KRB5CCNAME environment
-// variable.
-static const string KRB5CCNAME_PATH = "/tmp/krb5cc_impala_internal";
-
 // Constants for the two Sasl mechanisms we support
 static const string KERBEROS_MECHANISM = "GSSAPI";
 static const string PLAIN_MECHANISM = "PLAIN";
@@ -732,7 +729,12 @@ Status AuthManager::InitKerberosEnv() {
   // is normally fine, but if you're not running impala daemons as user
   // 'impala', the kinit we perform is going to blow away credentials for the
   // current user.  Not setting this isn't technically fatal, so ignore errors.
-  (void) setenv("KRB5CCNAME", "/tmp/krb5cc_impala_internal", 1);
+  const path krb5_ccname_path(FLAGS_krb5_ccname);
+  if (!krb5_ccname_path.is_absolute()) {
+    return Status(Substitute("Bad --krb5_ccname value: $0 is not an absolute file path",
+        FLAGS_krb5_ccname));
+  }
+  discard_result(setenv("KRB5CCNAME", FLAGS_krb5_ccname.c_str(), 1));
 
   // If an alternate krb5_conf location is supplied, set both KRB5_CONFIG and
   // JAVA_TOOL_OPTIONS in the environment.
@@ -785,7 +787,7 @@ Status SaslAuthProvider::Start() {
     // Starts a thread that periodically does a 'kinit'. The thread lives as long as the
     // process does.
     KUDU_RETURN_IF_ERROR(kudu::security::InitKerberosForServer(principal_, keytab_file_,
-        KRB5CCNAME_PATH, false), "Could not init kerberos");
+        FLAGS_krb5_ccname, false), "Could not init kerberos");
     LOG(INFO) << "Kerberos ticket granted to " << principal_;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index c6b95c8..0121787 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -15,11 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr-test-base.h"
+#include "rpc/rpc-mgr-test.h"
+
+#include "exec/kudu-util.h"
+#include "rpc/auth-provider.h"
 #include "service/fe-support.h"
+#include "testutil/mini-kdc-wrapper.h"
 
 DECLARE_string(be_principal);
 DECLARE_string(hostname);
+DECLARE_string(keytab_file);
+DECLARE_string(krb5_ccname);
 DECLARE_string(principal);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -27,32 +33,33 @@ DECLARE_string(ssl_private_key);
 
 // The principal name and the realm used for creating the mini-KDC.
 // To be initialized at main().
-static string kdc_principal;
-static string kdc_realm;
+static string kdc_ccname;
+static string principal;
+static string principal_kt_path;
+static string realm;
 
 namespace impala {
 
-class RpcMgrKerberizedTest :
-    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-
+class RpcMgrKerberizedTest : public RpcMgrTest {
   virtual void SetUp() override {
-    FLAGS_principal = "dummy-service/host@realm";
-    FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
+    FLAGS_principal = "dummy/host@realm";
+    FLAGS_be_principal = strings::Substitute("$0@$1", principal, realm);
+    FLAGS_keytab_file = principal_kt_path;
+    FLAGS_krb5_ccname = "/tmp/krb5cc_impala_internal";
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
-    RpcMgrTestBase::SetUp();
+    RpcMgrTest::SetUp();
   }
 
   virtual void TearDown() override {
     FLAGS_principal.clear();
     FLAGS_be_principal.clear();
+    FLAGS_keytab_file.clear();
+    FLAGS_krb5_ccname.clear();
+    RpcMgrTest::TearDown();
   }
 };
 
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_ON));
-
-TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+TEST_F(RpcMgrKerberizedTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change across
@@ -69,10 +76,78 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
+// This test aims to exercise the authorization function in RpcMgr by accessing
+// services with a principal different from FLAGS_be_principal.
+TEST_F(RpcMgrKerberizedTest, AuthorizationFail) {
+  GeneratedServiceIf* ping_impl =
+      TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_));
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
+  const int num_service_threads = 10;
+  const int queue_size = 10;
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  // Switch over to a credentials cache which only contains the dummy credential.
+  // Kinit done in InitAuth() uses a different credentials cache.
+  DCHECK_NE(FLAGS_krb5_ccname, kdc_ccname);
+  discard_result(setenv("KRB5CCNAME", kdc_ccname.c_str(), 1));
+
+  RpcController controller;
+  Status rpc_status;
+
+  // ScanMemService's authorization function always returns true so we should be able
+  // to access with dummy credentials.
+  unique_ptr<ScanMemServiceProxy> scan_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(
+      krpc_address_, FLAGS_hostname, &scan_proxy));
+  ScanMemRequestPB scan_request;
+  ScanMemResponsePB scan_response;
+  SetupScanMemRequest(&scan_request, &controller);
+  controller.Reset();
+  rpc_status =
+      FromKuduStatus(scan_proxy->ScanMem(scan_request, &scan_response, &controller));
+  EXPECT_TRUE(rpc_status.ok());
+
+  // Fail to access PingService as it's expecting FLAGS_be_principal as principal name.
+  unique_ptr<PingServiceProxy> ping_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(
+      krpc_address_, FLAGS_hostname, &ping_proxy));
+  PingRequestPB ping_request;
+  PingResponsePB ping_response;
+  controller.Reset();
+  rpc_status =
+      FromKuduStatus(ping_proxy->Ping(ping_request, &ping_response, &controller));
+  EXPECT_TRUE(!rpc_status.ok());
+  const string& err_string =
+      "Not authorized: {username='dummy', principal='dummy/host@KRBTEST.COM'}";
+  EXPECT_NE(rpc_status.GetDetail().find(err_string), string::npos);
+}
+
+// Test cases in which bad Kerberos credentials cache path is specified.
+TEST_F(RpcMgrKerberizedTest, BadCredentialsCachePath) {
+  FLAGS_krb5_ccname = "MEMORY:foo";
+  Status status = InitAuth(CURRENT_EXECUTABLE_PATH);
+  ASSERT_TRUE(!status.ok());
+  EXPECT_EQ(status.GetDetail(),
+      "Bad --krb5_ccname value: MEMORY:foo is not an absolute file path\n");
+
+  FLAGS_krb5_ccname = "~/foo";
+  status = InitAuth(CURRENT_EXECUTABLE_PATH);
+  ASSERT_TRUE(!status.ok());
+  EXPECT_EQ(status.GetDetail(),
+      "Bad --krb5_ccname value: ~/foo is not an absolute file path\n");
+}
+
 } // namespace impala
 
 using impala::Status;
@@ -86,15 +161,29 @@ int main(int argc, char** argv) {
   impala::IpAddr ip;
   impala::Status status = impala::HostnameToIpAddr(FLAGS_hostname, &ip);
   DCHECK(status.ok());
-  kdc_principal = Substitute("impala-test/$0", FLAGS_hostname);
-  kdc_realm = "KRBTEST.COM";
+  principal = Substitute("impala-test/$0", FLAGS_hostname);
+  realm = "KRBTEST.COM";
 
   int port = impala::FindUnusedEphemeralPort();
   std::unique_ptr<impala::MiniKdcWrapper> kdc;
-  status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(
-      kdc_principal, kdc_realm, "24h", "7d", port, &kdc);
+  status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(realm, "24h", "7d", port, &kdc);
   DCHECK(status.ok());
 
+  // Create a valid service principal and the associated keytab used for this test.
+  status = kdc->CreateServiceKeytab(principal, &principal_kt_path);
+  DCHECK(status.ok());
+
+  // Create a dummy service principal which is not authorized to access PingService.
+  const string& dummy_principal = "dummy/host";
+  status = kdc->CreateUserPrincipal(dummy_principal);
+  DCHECK(status.ok());
+  status = kdc->Kinit(dummy_principal);
+  DCHECK(status.ok());
+
+  // Get "KRB5CCNAME" set up by mini-kdc. It's the credentials cache which contains
+  // the dummy service's key
+  kdc_ccname = kdc->GetKrb5CCname();
+
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];
   int retval = RUN_ALL_TESTS();

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
deleted file mode 100644
index 258bd4b..0000000
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ /dev/null
@@ -1,284 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "rpc/rpc-mgr.inline.h"
-
-#include "common/init.h"
-#include "exec/kudu-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "rpc/auth-provider.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-#include "testutil/mini-kdc-wrapper.h"
-#include "testutil/scoped-flag-setter.h"
-#include "util/counting-barrier.h"
-#include "util/network-util.h"
-#include "util/openssl-util.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/rpc_test.proxy.h"
-#include "gen-cpp/rpc_test.service.h"
-#include "gen-cpp/rpc_test.pb.h"
-
-#include "common/names.h"
-
-using kudu::rpc::GeneratedServiceIf;
-using kudu::rpc::RpcController;
-using kudu::rpc::RpcContext;
-using kudu::rpc::RpcSidecar;
-using kudu::Slice;
-
-using namespace std;
-
-DECLARE_int32(num_reactor_threads);
-DECLARE_int32(num_acceptor_threads);
-DECLARE_string(hostname);
-
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-
-// The path of the current executable file that is required for passing into the SASL
-// library as the 'application name'.
-static string CURRENT_EXECUTABLE_PATH;
-
-namespace impala {
-
-static int32_t SERVICE_PORT = FindUnusedEphemeralPort();
-
-const static string IMPALA_HOME(getenv("IMPALA_HOME"));
-const string& SERVER_CERT =
-    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
-const string& PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
-const string& BAD_SERVER_CERT =
-    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
-const string& BAD_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
-const string& PASSWORD_PROTECTED_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
-
-/// Use this class to set the appropriate required TLS flags for the duration of the
-/// lifetime of the object.
-/// It is assumed that the flags always hold empty values by default.
-class ScopedSetTlsFlags {
- public:
-  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
-      const string& pkey_passwd = "", const string& ciphers = "") {
-    FLAGS_ssl_server_certificate = cert;
-    FLAGS_ssl_private_key = pkey;
-    FLAGS_ssl_client_ca_certificate = ca_cert;
-    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
-    FLAGS_ssl_cipher_list = ciphers;
-  }
-
-  ~ScopedSetTlsFlags() {
-    FLAGS_ssl_server_certificate = "";
-    FLAGS_ssl_private_key = "";
-    FLAGS_ssl_client_ca_certificate = "";
-    FLAGS_ssl_private_key_password_cmd = "";
-    FLAGS_ssl_cipher_list = "";
-  }
-};
-
-// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
-// support.
-const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
-const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
-
-#define PAYLOAD_SIZE (4096)
-
-template <class T> class RpcMgrTestBase : public T {
- public:
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
-  // Takes over ownership of the newly created 'service' which needs to have a lifetime
-  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
-  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
-    services_.emplace_back(move(service));
-    return services_.back().get();
-  }
-
- protected:
-  TNetworkAddress krpc_address_;
-  RpcMgr rpc_mgr_;
-
-  virtual void SetUp() override {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
-    exec_env_.reset(new ExecEnv());
-    ASSERT_OK(rpc_mgr_.Init());
-  }
-
-  virtual void TearDown() override {
-    rpc_mgr_.Shutdown();
-  }
-
- private:
-  int32_t payload_[PAYLOAD_SIZE];
-
-  // Own all the services used by the test.
-  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
-
-  // Required to set up the RPC metric groups used by the service pool.
-  std::unique_ptr<ExecEnv> exec_env_;
-};
-
-typedef std::function<void(RpcContext*)> ServiceCB;
-
-class PingServiceImpl : public PingServiceIf {
- public:
-  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
-  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
-      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
-
-  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
-      context) override {
-    response->set_int_response(42);
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_.Release(context->GetTransferSize());
-    cb_(context);
-  }
-
-  MemTracker* mem_tracker() { return &mem_tracker_; }
-
- private:
-  MemTracker mem_tracker_;
-  ServiceCB cb_;
-};
-
-class ScanMemServiceImpl : public ScanMemServiceIf {
- public:
-  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(-1, "ScanMem Service") {
-  }
-
-  // The request comes with an int 'pattern' and a payload of int array sent with
-  // sidecar. Scan the array to make sure every element matches 'pattern'.
-  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) override {
-    int32_t pattern = request->pattern();
-    Slice payload;
-    ASSERT_OK(
-        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
-    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
-
-    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
-    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
-      int32_t val = v[i];
-      if (val != pattern) {
-        // Incoming requests will already be tracked and we need to release the memory.
-        mem_tracker_.Release(context->GetTransferSize());
-        context->RespondFailure(kudu::Status::Corruption(
-            Substitute("Expecting $1; Found $2", pattern, val)));
-        return;
-      }
-    }
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_.Release(context->GetTransferSize());
-    context->RespondSuccess();
-  }
-
-  MemTracker* mem_tracker() { return &mem_tracker_; }
-
- private:
-  MemTracker mem_tracker_;
-
-};
-
-template <class T>
-Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
-    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  // Test that a service can be started, and will respond to requests.
-  GeneratedServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
-      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
-      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
-
-  // Test that a second service, that verifies the RPC payload is not corrupted,
-  // can be started.
-  GeneratedServiceIf* scan_mem_impl = test_base->TakeOverService(
-      make_unique<ScanMemServiceImpl>(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
-      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
-
-  FLAGS_num_acceptor_threads = 2;
-  FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
-
-  unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
-      &ping_proxy));
-
-  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
-      &scan_mem_proxy));
-
-  RpcController controller;
-  srand(0);
-  // Randomly invoke either services to make sure a RpcMgr can host multiple
-  // services at the same time.
-  for (int i = 0; i < 100; ++i) {
-    controller.Reset();
-    if (random() % 2 == 0) {
-      PingRequestPB request;
-      PingResponsePB response;
-      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
-          "unable to execute Ping() RPC.");
-      if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
-      }
-    } else {
-      ScanMemRequestPB request;
-      ScanMemResponsePB response;
-      test_base->SetupScanMemRequest(&request, &controller);
-      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
-          "unable to execute ScanMem() RPC.");
-    }
-  }
-
-  return Status::OK();
-}
-
-} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 6a774bd..0d4ad47 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr-test-base.h"
+#include "rpc/rpc-mgr-test.h"
+
+#include "kudu/util/monotime.h"
 #include "service/fe-support.h"
+#include "testutil/mini-kdc-wrapper.h"
+#include "util/counting-barrier.h"
 
 using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
@@ -28,18 +32,8 @@ DECLARE_int32(num_acceptor_threads);
 DECLARE_int32(rpc_negotiation_timeout_ms);
 DECLARE_string(hostname);
 
-namespace impala {
-
 // For tests that do not require kerberized testing, we use RpcTest.
-class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
-  virtual void SetUp() override {
-    RpcMgrTestBase::SetUp();
-  }
-
-  virtual void TearDown() override {
-    RpcMgrTestBase::TearDown();
-  }
-};
+namespace impala {
 
 TEST_F(RpcMgrTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
@@ -57,12 +51,12 @@ TEST_F(RpcMgrTest, MultipleServicesTls) {
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
 TEST_F(RpcMgrTest, MultipleServices) {
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+  ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_));
 }
 
 // Test with a misconfigured TLS certificate and verify that an error is thrown.
@@ -112,7 +106,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -146,7 +140,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -165,7 +159,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -179,8 +173,8 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  GeneratedServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
-      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+  GeneratedServiceIf* ping_impl =
+      TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
   ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
@@ -205,8 +199,8 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  GeneratedServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
-      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
   ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
@@ -252,8 +246,7 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
   secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port);
 
   ASSERT_OK(secondary_rpc_mgr.Init());
-  ASSERT_FALSE(RunMultipleServicesTestTemplate(
-      this, &secondary_rpc_mgr, secondary_krpc_address).ok());
+  ASSERT_FALSE(RunMultipleServicesTest(&secondary_rpc_mgr, secondary_krpc_address).ok());
   secondary_rpc_mgr.Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
new file mode 100644
index 0000000..bd2ea96
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_TEST_H
+#define IMPALA_RPC_RPC_MGR_TEST_H
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/status.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/network-util.h"
+#include "util/openssl-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+#include "gen-cpp/rpc_test.pb.h"
+
+#include "common/names.h"
+
+using kudu::rpc::GeneratedServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+
+// The path of the current executable file that is required for passing into the SASL
+// library as the 'application name'.
+static string CURRENT_EXECUTABLE_PATH;
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort();
+
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+/// Use this class to set the appropriate required TLS flags for the duration of the
+/// lifetime of the object.
+/// It is assumed that the flags always hold empty values by default.
+class ScopedSetTlsFlags {
+ public:
+  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
+      const string& pkey_passwd = "", const string& ciphers = "") {
+    FLAGS_ssl_server_certificate = cert;
+    FLAGS_ssl_private_key = pkey;
+    FLAGS_ssl_client_ca_certificate = ca_cert;
+    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
+    FLAGS_ssl_cipher_list = ciphers;
+  }
+
+  ~ScopedSetTlsFlags() {
+    FLAGS_ssl_server_certificate = "";
+    FLAGS_ssl_private_key = "";
+    FLAGS_ssl_client_ca_certificate = "";
+    FLAGS_ssl_private_key_password_cmd = "";
+    FLAGS_ssl_cipher_list = "";
+  }
+};
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
+#define PAYLOAD_SIZE (4096)
+
+class RpcMgrTest : public testing::Test {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+  // Utility function which alternately makes requests to PingService and ScanMemService.
+  Status RunMultipleServicesTest(RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address);
+
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    exec_env_.reset(new ExecEnv());
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+  // Takes over ownership of the newly created 'service' which needs to have a lifetime
+  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
+  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
+    services_.emplace_back(move(service));
+    return services_.back().get();
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+
+  // Own all the services used by the test.
+  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
+
+  // Required to set up the RPC metric groups used by the service pool.
+  std::unique_ptr<ExecEnv> exec_env_;
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+  PingServiceImpl(RpcMgr* rpc_mgr,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      rpc_mgr_(rpc_mgr),
+      mem_tracker_(-1, "Ping Service"),
+      cb_(cb) {}
+
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, RpcContext* context) override {
+    return rpc_mgr_->Authorize("PingService", context, mem_tracker());
+  }
+
+  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
+      context) override {
+    response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_.Release(context->GetTransferSize());
+    cb_(context);
+  }
+
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
+ private:
+  RpcMgr* rpc_mgr_;
+  MemTracker mem_tracker_;
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(RpcMgr* rpc_mgr)
+    : ScanMemServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      mem_tracker_(-1, "ScanMem Service") {
+  }
+
+  // A no-op authorization function.
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, RpcContext* context) override {
+    return true;
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+      RpcContext* context) override {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_.Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_.Release(context->GetTransferSize());
+    context->RespondSuccess();
+  }
+
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
+ private:
+  MemTracker mem_tracker_;
+
+};
+
+Status RpcMgrTest::RunMultipleServicesTest(
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  // Test that a service can be started, and will respond to requests.
+  GeneratedServiceIf* ping_impl = TakeOverService(
+      make_unique<PingServiceImpl>(rpc_mgr));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+
+  // Test that a second service, that verifies the RPC payload is not corrupted,
+  // can be started.
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(rpc_mgr));
+
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
+      &ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
+      &scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+          "unable to execute ScanMem() RPC.");
+    }
+  }
+  return Status::OK();
+}
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index a95f181..cda7161 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -19,11 +19,15 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/service_if.h"
+#include "kudu/security/init.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
+#include "runtime/mem-tracker.h"
 #include "util/auth-util.h"
 #include "util/cpu-info.h"
 #include "util/network-util.h"
@@ -39,11 +43,13 @@ using kudu::MonoDelta;
 using kudu::rpc::AcceptorPool;
 using kudu::rpc::DumpRunningRpcsRequestPB;
 using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
+using kudu::rpc::RemoteUser;
 using kudu::rpc::RpcConnectionPB;
+using kudu::rpc::RpcContext;
 using kudu::rpc::RpcController;
-using kudu::rpc::GeneratedServiceIf;
 using kudu::Sockaddr;
 
 DECLARE_string(hostname);
@@ -107,7 +113,6 @@ Status RpcMgr::Init() {
   if (IsKerberosEnabled()) {
     string internal_principal;
     RETURN_IF_ERROR(GetInternalKerberosPrincipal(&internal_principal));
-
     string service_name, unused_hostname, unused_realm;
     RETURN_IF_ERROR(ParseKerberosPrincipal(internal_principal, &service_name,
         &unused_hostname, &unused_realm));
@@ -151,6 +156,30 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
   return Status::OK();
 }
 
+bool RpcMgr::Authorize(const string& service_name, RpcContext* context,
+    MemTracker* mem_tracker) const {
+  // Authorization is enforced iff Kerberos is enabled.
+  if (!IsKerberosEnabled()) return true;
+
+  // Check if the mapped username matches that of the kinit'ed principal.
+  const RemoteUser& remote_user = context->remote_user();
+  const string& logged_in_username =
+      kudu::security::GetLoggedInUsernameFromKeytab().value_or("");
+  DCHECK(!logged_in_username.empty());
+  bool authorized = remote_user.username() == logged_in_username &&
+      remote_user.authenticated_by() == RemoteUser::Method::KERBEROS;
+  if (UNLIKELY(!authorized)) {
+    LOG(ERROR) << Substitute("Rejecting unauthorized access to $0 from $1. Expected "
+        "user $2", service_name, context->requestor_string(), logged_in_username);
+    mem_tracker->Release(context->GetTransferSize());
+    context->RespondFailure(kudu::Status::NotAuthorized(
+        Substitute("$0 is not allowed to access $1",
+            remote_user.ToString(), service_name)));
+    return false;
+  }
+  return true;
+}
+
 Status RpcMgr::StartServices(const TNetworkAddress& address) {
   DCHECK(is_inited()) << "Must call Init() before StartServices()";
   DCHECK(!services_started_) << "May not call StartServices() twice";

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index c25f754..6435a74 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -133,6 +133,16 @@ class RpcMgr {
       kudu::rpc::GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
+  /// Returns true if the given 'remote_user' in RpcContext 'context' is authorized to
+  /// access 'service_name' registered with this RpcMgr. Authorization is only enforced
+  /// when Kerberos is enabled.
+  ///
+  /// If authorization is denied, the RPC is responded to with an error message. Memory
+  /// of RPC payloads accounted towards 'mem_tracker', the service's MemTracker, is also
+  /// released.
+  bool Authorize(const string& service_name, kudu::rpc::RpcContext* context,
+      MemTracker* mem_tracker) const;
+
   /// Creates a new proxy for a remote service of type P at location 'address' with
   /// hostname 'hostname' and places it in 'proxy'. 'P' must descend from
   /// kudu::rpc::ServiceIf. Note that 'address' must be a resolved IP address.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index af867de..621f557 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -37,6 +37,7 @@ using apache::thrift::transport::SSLProtocol;
 
 DECLARE_string(principal);
 DECLARE_string(be_principal);
+DECLARE_string(keytab_file);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_cipher_list);
 DECLARE_string(ssl_minimum_version);
@@ -59,8 +60,9 @@ static const string& PASSWORD_PROTECTED_PRIVATE_KEY =
     Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
 
 // The principal name and the realm used for creating the mini-KDC.
-static const string kdc_principal = "impala/localhost";
-static const string kdc_realm = "KRBTEST.COM";
+static const string principal = "impala/localhost";
+static const string realm = "KRBTEST.COM";
+static string principal_kt_path;
 
 // Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
 // support.
@@ -105,9 +107,11 @@ class ThriftKerberizedParamsTest :
     if (k == KERBEROS_OFF) {
       FLAGS_principal.clear();
       FLAGS_be_principal.clear();
+      FLAGS_keytab_file.clear();
     } else {
       FLAGS_principal = "dummy-service/host@realm";
-      FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
+      FLAGS_be_principal = strings::Substitute("$0@$1", principal, realm);
+      FLAGS_keytab_file = principal_kt_path;
     }
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
     ThriftTestBase::SetUp();
@@ -116,6 +120,7 @@ class ThriftKerberizedParamsTest :
   virtual void TearDown() override {
     FLAGS_principal.clear();
     FLAGS_be_principal.clear();
+    FLAGS_keytab_file.clear();
   }
 };
 
@@ -558,8 +563,12 @@ int main(int argc, char** argv) {
 
   int port = impala::FindUnusedEphemeralPort();
   std::unique_ptr<impala::MiniKdcWrapper> kdc;
-  Status status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(
-      kdc_principal, kdc_realm, "24h", "7d", port, &kdc);
+  Status status =
+      impala::MiniKdcWrapper::SetupAndStartMiniKDC(realm, "24h", "7d", port, &kdc);
+  DCHECK(status.ok());
+
+  // Create the service principal and keytab used for this test.
+  status = kdc->CreateServiceKeytab(principal, &principal_kt_path);
   DCHECK(status.ok());
 
   // Fill in the path of the current binary for use by the tests.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index c1f9cc6..de5e349 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -107,6 +107,11 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, mem_tracker());
   }
 
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, kudu::rpc::RpcContext* context) {
+    return true;
+  }
+
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, RpcContext* rpc_context) {
     stream_mgr_->AddData(request, response, rpc_context);

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index b7892ff..723d7ca 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -71,6 +71,12 @@ Status DataStreamService::Init() {
   return Status::OK();
 }
 
+bool DataStreamService::Authorize(const google::protobuf::Message* req,
+    google::protobuf::Message* resp, RpcContext* context) {
+  return ExecEnv::GetInstance()->rpc_mgr()->Authorize("DataStreamService", context,
+      mem_tracker());
+}
+
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {
   // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index e233165..5fdf6dd 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -46,6 +46,12 @@ class DataStreamService : public DataStreamServiceIf {
   /// This mustn't be called until RPC manager has been initialized.
   Status Init();
 
+  /// Returns true iff the 'remote_user' in 'context' is authorized to access
+  /// DataStreamService. On denied access, the RPC is replied to with an error message.
+  /// Authorization is enforced only when Kerberos is enabled.
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, kudu::rpc::RpcContext* context);
+
   /// Notifies the receiver to close the data stream specified in 'request'.
   /// The receiver replies to the client with a status serialized in 'response'.
   virtual void EndDataStream(const EndDataStreamRequestPB* request,

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/testutil/mini-kdc-wrapper.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/mini-kdc-wrapper.cc b/be/src/testutil/mini-kdc-wrapper.cc
index 526e5b1..11c4d64 100644
--- a/be/src/testutil/mini-kdc-wrapper.cc
+++ b/be/src/testutil/mini-kdc-wrapper.cc
@@ -32,11 +32,12 @@ using filesystem::path;
 
 DECLARE_string(keytab_file);
 DECLARE_string(krb5_conf);
+DECLARE_string(krb5_ccname);
 
 Status MiniKdcWrapper::StartKdc(string keytab_dir) {
   kudu::MiniKdcOptions options;
   options.realm = realm_;
-  options.data_root = keytab_dir;
+  options.data_root = move(keytab_dir);
   options.ticket_lifetime = ticket_lifetime_;
   options.renew_lifetime = renew_lifetime_;
   options.port = kdc_port_;
@@ -54,34 +55,42 @@ Status MiniKdcWrapper::StopKdc() {
   return Status::OK();
 }
 
+Status MiniKdcWrapper::Kinit(const string& username) {
+  KUDU_RETURN_IF_ERROR(kdc_->Kinit(username), "Failed to kinit.");
+  return Status::OK();
+}
+
+Status MiniKdcWrapper::CreateUserPrincipal(const string& username) {
+  KUDU_RETURN_IF_ERROR(kdc_->CreateUserPrincipal(username),
+      "Failed to create user principal.");
+  return Status::OK();
+}
+
 Status MiniKdcWrapper::CreateServiceKeytab(const string& spn, string* kt_path) {
   KUDU_RETURN_IF_ERROR(kdc_->CreateServiceKeytab(spn, kt_path),
       "Failed to create service keytab.");
   return Status::OK();
 }
 
-Status MiniKdcWrapper::SetupAndStartMiniKDC(string spn, string realm,
-    string ticket_lifetime, string renew_lifetime, int kdc_port,
-    unique_ptr<MiniKdcWrapper>* kdc_ptr) {
-  std::unique_ptr<MiniKdcWrapper> kdc(new MiniKdcWrapper(
-      spn, realm, ticket_lifetime, renew_lifetime, kdc_port));
+Status MiniKdcWrapper::SetupAndStartMiniKDC(string realm,
+    string ticket_lifetime, string renew_lifetime,
+    int kdc_port, unique_ptr<MiniKdcWrapper>* kdc_ptr) {
+  unique_ptr<MiniKdcWrapper> kdc(new MiniKdcWrapper(
+      move(realm), move(ticket_lifetime), move(renew_lifetime), kdc_port));
   DCHECK(kdc.get() != nullptr);
 
   // Enable the workaround for MIT krb5 1.10 bugs from krb5_realm_override.cc.
   setenv("KUDU_ENABLE_KRB5_REALM_FIX", "true", 0);
 
   // Check if the unique directory already exists, and create it if it doesn't.
-  RETURN_IF_ERROR(FileSystemUtil::RemoveAndCreateDirectory(kdc->unique_test_dir_.string()));
+  RETURN_IF_ERROR(
+      FileSystemUtil::RemoveAndCreateDirectory(kdc->unique_test_dir_.string()));
   string keytab_dir = kdc->unique_test_dir_.string() + "/krb5kdc";
 
   RETURN_IF_ERROR(kdc->StartKdc(keytab_dir));
 
-  string kt_path;
-  RETURN_IF_ERROR(kdc->CreateServiceKeytab(kdc->spn_, &kt_path));
-
   // Set the appropriate flags based on how we've set up the kerberos environment.
   FLAGS_krb5_conf = strings::Substitute("$0/$1", keytab_dir, "krb5.conf");
-  FLAGS_keytab_file = kt_path;
 
   *kdc_ptr = std::move(kdc);
   return Status::OK();
@@ -91,7 +100,6 @@ Status MiniKdcWrapper::TearDownMiniKDC() {
   RETURN_IF_ERROR(StopKdc());
 
   // Clear the flags so we don't step on other tests that may run in the same process.
-  FLAGS_keytab_file.clear();
   FLAGS_krb5_conf.clear();
 
   // Remove test directory.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/testutil/mini-kdc-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/mini-kdc-wrapper.h b/be/src/testutil/mini-kdc-wrapper.h
index 602c15b..c233dda 100644
--- a/be/src/testutil/mini-kdc-wrapper.h
+++ b/be/src/testutil/mini-kdc-wrapper.h
@@ -40,19 +40,33 @@ class MiniKdcWrapper {
   /// This function creates the 'unique_test_dir_' path, starts the KDC and sets the
   /// appropriate flags that Impala requires to run with Kerberos. The newly created
   /// KDC is stored in 'kdc_ptr'. Return error status on failure.
-  static Status SetupAndStartMiniKDC(std::string spn, std::string realm,
-      std::string ticket_lifetime, std::string renew_lifetime, int kdc_port,
-      std::unique_ptr<MiniKdcWrapper>* kdc_ptr);
+  static Status SetupAndStartMiniKDC(std::string realm,
+      std::string ticket_lifetime, std::string renew_lifetime,
+      int kdc_port, std::unique_ptr<MiniKdcWrapper>* kdc_ptr);
 
   /// Undoes everything done by SetupAndStartMiniKDC().
   Status TearDownMiniKDC();
 
+  /// Kinit a user to the mini KDC.
+  Status Kinit(const string& username);
+
+  /// Creates a new user with the given username.
+  /// The password is the same as the username.
+  Status CreateUserPrincipal(const string& username);
+
+  /// Creates a keytab file under the 'unique_test_dir_' path which is configured to
+  /// authenticate the service principal 'spn'. The path to the file is returned as a
+  /// string in 'kt_path'.
+  Status CreateServiceKeytab(const string& spn, string* kt_path);
+
+  /// Returns the environment variable ""KRB5CCNAME" configured in the setup of mini-kdc.
+  const string GetKrb5CCname() const {
+    return kdc_->GetEnvVars()["KRB5CCNAME"];
+  }
+
  private:
   boost::scoped_ptr<kudu::MiniKdc> kdc_;
 
-  /// The service's principal name.
-  const std::string spn_;
-
   /// The name of the kerberos realm to setup.
   const std::string realm_;
 
@@ -69,26 +83,20 @@ class MiniKdcWrapper {
   boost::filesystem::path unique_test_dir_ = boost::filesystem::unique_path();
 
   /// Called by SetupAndStartMiniKDC() only.
-  MiniKdcWrapper(std::string spn, std::string realm, std::string ticket_lifetime,
-    std::string renew_lifetime, int kdc_port) :
-      spn_(spn),
-      realm_(realm),
-      ticket_lifetime_(ticket_lifetime),
-      renew_lifetime_(renew_lifetime),
+  MiniKdcWrapper(std::string&& realm, std::string&& ticket_lifetime,
+      std::string&& renew_lifetime, int kdc_port)
+    : realm_(std::move(realm)),
+      ticket_lifetime_(std::move(ticket_lifetime)),
+      renew_lifetime_(std::move(renew_lifetime)),
       kdc_port_(kdc_port) {
   }
 
   /// Starts the KDC and configures it to use 'keytab_dir' as the location to store the
   /// keytab. The 'keytab_dir' will not be cleaned up by this class.
-  Status StartKdc(string keytab_dir);
+  Status StartKdc(std::string keytab_dir);
 
   /// Stops the KDC by terminating the krb5kdc subprocess.
   Status StopKdc();
-
-  /// Creates a keytab file under the 'unique_test_dir_' path which is configured to
-  /// authenticate the service principal 'spn_'. The path to the file is returned as a
-  /// string in 'kt_path'.
-  Status CreateServiceKeytab(const string& spn, string* kt_path);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/util/auth-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc
index 50af438..e8a0e41 100644
--- a/be/src/util/auth-util.cc
+++ b/be/src/util/auth-util.cc
@@ -31,25 +31,25 @@ namespace impala {
 // Pattern for hostname substitution.
 static const string HOSTNAME_PATTERN = "_HOST";
 
-  const string& GetEffectiveUser(const TSessionState& session) {
-    if (session.__isset.delegated_user && !session.delegated_user.empty()) {
-      return session.delegated_user;
-    }
-    return session.connected_user;
+const string& GetEffectiveUser(const TSessionState& session) {
+  if (session.__isset.delegated_user && !session.delegated_user.empty()) {
+    return session.delegated_user;
   }
+  return session.connected_user;
+}
 
-  const string& GetEffectiveUser(const ImpalaServer::SessionState& session) {
-    return session.do_as_user.empty() ? session.connected_user : session.do_as_user;
-  }
+const string& GetEffectiveUser(const ImpalaServer::SessionState& session) {
+  return session.do_as_user.empty() ? session.connected_user : session.do_as_user;
+}
 
-  Status CheckProfileAccess(const string& user, const string& effective_user,
-      bool has_access) {
-    if (user.empty() || (user == effective_user && has_access)) return Status::OK();
-    stringstream ss;
-    ss << "User " << user << " is not authorized to access the runtime profile or "
-       << "execution summary.";
-    return Status(ss.str());
-  }
+Status CheckProfileAccess(const string& user, const string& effective_user,
+    bool has_access) {
+  if (user.empty() || (user == effective_user && has_access)) return Status::OK();
+  stringstream ss;
+  ss << "User " << user << " is not authorized to access the runtime profile or "
+     << "execution summary.";
+  return Status(ss.str());
+}
 
 // Replaces _HOST with the hostname if it occurs in the principal string.
 Status ReplacePrincipalHostFormat(string* out_principal) {
@@ -83,6 +83,7 @@ Status GetInternalKerberosPrincipal(string* out_principal) {
 
 Status ParseKerberosPrincipal(const string& principal, string* service_name,
     string* hostname, string* realm) {
+  // TODO: IMPALA-7504: replace this with krb5_parse_name().
   vector<string> names;
 
   split(names, principal, is_any_of("/"));
@@ -100,8 +101,4 @@ Status ParseKerberosPrincipal(const string& principal, string* service_name,
   return Status::OK();
 }
 
-bool IsKerberosEnabled() {
-  return !FLAGS_principal.empty();
-}
-
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/util/auth-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h
index 4b9fed6..f73e962 100644
--- a/be/src/util/auth-util.h
+++ b/be/src/util/auth-util.h
@@ -22,6 +22,8 @@
 #include <string>
 #include "service/impala-server.h"
 
+DECLARE_string(principal);
+
 namespace impala {
 
 class TSessionState;
@@ -63,7 +65,9 @@ Status ParseKerberosPrincipal(const std::string& principal, std::string* service
     std::string* hostname, std::string* realm);
 
 /// Returns true if kerberos is enabled.
-bool IsKerberosEnabled();
+inline bool IsKerberosEnabled() {
+  return !FLAGS_principal.empty();
+}
 
 } // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 10f36af..c0c8fe2 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -43,6 +43,7 @@ www/d3.v3.min.js
 www/jquery/jquery-1.12.4.min.js
 tests/comparison/leopard/static/css/hljs.css
 tests/comparison/leopard/static/js/highlight.pack.js
+common/protobuf/kudu
 be/src/kudu/util/array_view.h
 be/src/kudu/util/cache-test.cc
 be/src/kudu/util/cache.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 854eb87..68c2e90 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -21,6 +21,8 @@ package impala;
 import "common.proto";
 import "row_batch.proto";
 
+import "kudu/rpc/rpc_header.proto";
+
 // All fields are required in V1.
 message TransmitDataRequestPB {
   // The fragment instance id of the receiver.
@@ -76,6 +78,9 @@ message EndDataStreamResponsePB {
 
 // Handles data transmission between fragment instances.
 service DataStreamService {
+  // Override the default authorization method.
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   // Called by sender to transmit a single row batch. Returns error indication
   // if params.fragmentId or params.destNodeId are unknown or if data couldn't
   // be read.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/kudu
----------------------------------------------------------------------
diff --git a/common/protobuf/kudu b/common/protobuf/kudu
new file mode 120000
index 0000000..6631864
--- /dev/null
+++ b/common/protobuf/kudu
@@ -0,0 +1 @@
+../../be/src/kudu/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/rpc_test.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto
index fd22331..70782e5 100644
--- a/common/protobuf/rpc_test.proto
+++ b/common/protobuf/rpc_test.proto
@@ -17,6 +17,8 @@
 //
 package impala;
 
+import "kudu/rpc/rpc_header.proto";
+
 // Definitions for service used for rpc-mgr-test.
 message PingRequestPB {
 }
@@ -26,6 +28,8 @@ message PingResponsePB {
 }
 
 service PingService {
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   rpc Ping(PingRequestPB) returns (PingResponsePB);
 }
 
@@ -38,5 +42,7 @@ message ScanMemResponsePB {
 }
 
 service ScanMemService {
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   rpc ScanMem(ScanMemRequestPB) returns (ScanMemResponsePB);
 }


[4/7] impala git commit: IMPALA-6644: Add last heartbeat timestamp into Statestore metric

Posted by kw...@apache.org.
IMPALA-6644: Add last heartbeat timestamp into Statestore metric

After this patch, the statestore keeps track of the time since the
last heartbeat for each subscriber. It is exposed as a subscriber
metric on the statestore debug page. It also adds a monitoring
thread that periodically checks the last heartbeat timestamp for
all subscribers and logs the IDs of those that have not been
updated since the last periodic check.

Testing: Added an end to end test to verify the 'sec_since_heartbeat'
metric of a slow subscriber.

Change-Id: I754adccc4569e8219d5d01500cccdfc8782953f7
Reviewed-on: http://gerrit.cloudera.org:8080/11052
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8692bfbe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8692bfbe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8692bfbe

Branch: refs/heads/master
Commit: 8692bfbef657fe95da68e9dcaca9b49de331ccc3
Parents: 8848588
Author: poojanilangekar <po...@cloudera.com>
Authored: Tue Jul 24 18:01:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 29 22:06:00 2018 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore.cc     | 55 ++++++++++++++++++++++++++++++--
 be/src/statestore/statestore.h      | 30 ++++++++++++++++-
 tests/statestore/test_statestore.py | 20 ++++++++++++
 www/statestore_subscribers.tmpl     |  2 ++
 4 files changed, 104 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8749825..4e63dad 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -75,6 +75,8 @@ DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads
     " send heartbeats in parallel to all registered subscribers.");
 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
     " which the statestore sends heartbeat heartbeats to subscribers.");
+DEFINE_double_hidden(heartbeat_monitoring_frequency_ms, 60000, "(Advanced) Frequency (in "
+    "ms) with which the statestore monitors heartbeats from a subscriber.");
 
 DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
 
@@ -315,6 +317,7 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
   : subscriber_id_(subscriber_id),
     registration_id_(registration_id),
     network_address_(network_address) {
+  RefreshLastHeartbeatTimestamp();
   for (const TTopicRegistration& topic : subscribed_topics) {
     GetTopicsMapForId(topic.topic_name)
         ->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
@@ -388,6 +391,11 @@ void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_i
   topic_it->second.last_version.Store(version);
 }
 
+void Statestore::Subscriber::RefreshLastHeartbeatTimestamp() {
+  DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load());
+  last_heartbeat_ts_.Store(MonotonicMillis());
+}
+
 Statestore::Statestore(MetricGroup* metrics)
   : subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
@@ -419,7 +427,6 @@ Statestore::Statestore(MetricGroup* metrics)
     failure_detector_(new MissedHeartbeatFailureDetector(
         FLAGS_statestore_max_missed_heartbeats,
         FLAGS_statestore_max_missed_heartbeats / 2)) {
-
   DCHECK(metrics != NULL);
   metrics_ = metrics;
   num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
@@ -440,6 +447,10 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
 }
 
+Statestore::~Statestore() {
+  CHECK(initialized_) << "Cannot shutdown Statestore once initialized.";
+}
+
 Status Statestore::Init(int32_t state_store_port) {
   boost::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
   boost::shared_ptr<TProcessorEventHandler> event_handler(
@@ -464,6 +475,9 @@ Status Statestore::Init(int32_t state_store_port) {
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+  RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", "heartbeat-monitoring-thread",
+      &Statestore::MonitorSubscriberHeartbeat, this, &heartbeat_monitoring_thread_));
+  initialized_ = true;
 
   return Status::OK();
 }
@@ -536,6 +550,12 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
         document->GetAllocator());
     sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
 
+    Value secs_since_heartbeat(
+        StringPrintf("%.3f", subscriber.second->SecondsSinceHeartbeat()).c_str(),
+        document->GetAllocator());
+    sub_json.AddMember(
+        "secs_since_heartbeat", secs_since_heartbeat, document->GetAllocator());
+
     subscribers.PushBack(sub_json, document->GetAllocator());
   }
   document->AddMember("subscribers", subscribers, document->GetAllocator());
@@ -898,7 +918,9 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
   Status status;
   if (is_heartbeat) {
     status = SendHeartbeat(subscriber.get());
-    if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+    if (status.ok()) {
+      subscriber->RefreshLastHeartbeatTimestamp();
+    } else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the stack
       status.AddDetail(Substitute(
           "Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
@@ -968,6 +990,35 @@ void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
   }
 }
 
+[[noreturn]] void Statestore::MonitorSubscriberHeartbeat() {
+  while (1) {
+    int num_subscribers;
+    vector<SubscriberId> inactive_subscribers;
+    SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms);
+    {
+      lock_guard<mutex> l(subscribers_lock_);
+      num_subscribers = subscribers_.size();
+      for (const auto& subscriber : subscribers_) {
+        if (subscriber.second->SecondsSinceHeartbeat()
+            > FLAGS_heartbeat_monitoring_frequency_ms) {
+          inactive_subscribers.push_back(subscriber.second->id());
+        }
+      }
+    }
+    if (inactive_subscribers.empty()) {
+      LOG(INFO) << "All " << num_subscribers
+                << " subscribers successfully heartbeat in the last "
+                << FLAGS_heartbeat_monitoring_frequency_ms << "ms.";
+    } else {
+      int num_active_subscribers = num_subscribers - inactive_subscribers.size();
+      LOG(WARNING) << num_active_subscribers << "/" << num_subscribers
+                   << " subscribers successfully heartbeat in the last "
+                   << FLAGS_heartbeat_monitoring_frequency_ms << "ms."
+                   << " Slow subscribers: " << boost::join(inactive_subscribers, ", ");
+    }
+  }
+}
+
 void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
   if (it == subscribers_.end() ||

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 9326492..52f7d68 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -18,6 +18,7 @@
 #ifndef STATESTORE_STATESTORE_H
 #define STATESTORE_STATESTORE_H
 
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -130,6 +131,9 @@ class Statestore : public CacheLineAligned {
   /// The only constructor; initialises member variables only.
   Statestore(MetricGroup* metrics);
 
+  /// Destructor, should not be called once the Statestore is initialized.
+  ~Statestore();
+
   /// Initialize and start the backing ThriftServer with port 'state_store_port'.
   /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
   /// any of the above initialization fails.
@@ -150,7 +154,7 @@ class Statestore : public CacheLineAligned {
 
   void RegisterWebpages(Webserver* webserver);
 
-  /// The main processing loop. Blocks until the exit flag is set.
+  /// The main processing loop. Runs infinitely.
   void MainLoop();
 
   /// Returns the Thrift API interface that proxies requests onto the local Statestore.
@@ -384,6 +388,12 @@ class Statestore : public CacheLineAligned {
     const SubscriberId& id() const { return subscriber_id_; }
     const RegistrationId& registration_id() const { return registration_id_; }
 
+    /// Returns the time elapsed (in seconds) since the last heartbeat.
+    double SecondsSinceHeartbeat() const {
+      return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load()))
+          / 1000.0;
+    }
+
     /// Get the Topics map that would be used to store 'topic_id'.
     const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
       return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
@@ -427,6 +437,9 @@ class Statestore : public CacheLineAligned {
     void SetLastTopicVersionProcessed(const TopicId& topic_id,
         TopicEntry::Version version);
 
+    /// Refresh the subscriber's last heartbeat timestamp to the current monotonic time.
+    void RefreshLastHeartbeatTimestamp();
+
    private:
     /// Unique human-readable identifier for this subscriber, set by the subscriber itself
     /// on a Register call.
@@ -449,6 +462,10 @@ class Statestore : public CacheLineAligned {
     Topics priority_subscribed_topics_;
     Topics non_priority_subscribed_topics_;
 
+    /// The timestamp of the last successful heartbeat in milliseconds. A timestamp much
+    /// older than the heartbeat frequency implies an unresponsive subscriber.
+    AtomicInt64 last_heartbeat_ts_{0};
+
     /// Lock held when adding or deleting transient entries. See class comment for lock
     /// acquisition order.
     boost::mutex transient_entry_lock_;
@@ -534,6 +551,12 @@ class Statestore : public CacheLineAligned {
 
   ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
 
+  /// Thread that monitors the heartbeats of all subscribers.
+  std::unique_ptr<Thread> heartbeat_monitoring_thread_;
+
+  /// Flag to indicate that the statestore has been initialized.
+  bool initialized_ = false;
+
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
   /// subscriber should be used, but the cache helps with the client lifecycle on failure.
   boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_;
@@ -683,6 +706,11 @@ class Statestore : public CacheLineAligned {
   void SubscribersHandler(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
 
+  /// Monitors the heartbeats of all subscribers every
+  /// FLAGS_heartbeat_monitoring_frequency_ms milliseconds. If a subscriber's
+  /// last_heartbeat_ts_ has not been updated in that interval, it logs the subscriber's
+  /// id.
+  [[noreturn]] void MonitorSubscriberHeartbeat();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 8f26b63..23f8aa8 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -18,6 +18,7 @@
 from collections import defaultdict
 import json
 import logging
+from random import randint
 import socket
 import threading
 import traceback
@@ -524,6 +525,25 @@ class TestStatestore():
            .wait_for_failure(timeout=60)
        )
 
+  def test_slow_subscriber(self):
+    """Test for IMPALA-6644: This test kills a healthy subscriber and sleeps for a random
+    interval between 1 and 9 seconds, this lets the heartbeats fail without removing the
+    subscriber from the set of active subscribers. It then checks the subscribers page
+    of the statestore to ensure that the 'time_since_heartbeat' field is updated with an
+    acceptable value. Since the statestore heartbeats at 1 second intervals, an acceptable
+    value would be between ((sleep_time-1.0), (sleep_time+1.0))."""
+    sub = StatestoreSubscriber()
+    sub.start().register().wait_for_heartbeat(1)
+    sub.kill()
+    sleep_time = randint(1, 9)
+    time.sleep(sleep_time)
+    subscribers = get_statestore_subscribers()["subscribers"]
+    for s in subscribers:
+      if str(s["id"]) == sub.subscriber_id:
+        secs_since_heartbeat = float(s["secs_since_heartbeat"])
+        assert (secs_since_heartbeat > float(sleep_time - 1.0))
+        assert (secs_since_heartbeat < float(sleep_time + 1.0))
+
   def test_topic_persistence(self):
     """Test that persistent topic entries survive subscriber failure, but transent topic
     entries are erased when the associated subscriber fails"""

http://git-wip-us.apache.org/repos/asf/impala/blob/8692bfbe/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f57b4f6..77b07dc 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -28,6 +28,7 @@ under the License.
   <th>Subscribed priority topics</th>
   <th>Transient entries</th>
   <th>Registration ID</th>
+  <th>Seconds since last heartbeat</th>
 </tr>
 
 {{#subscribers}}
@@ -38,6 +39,7 @@ under the License.
   <td>{{num_priority_topics}}</td>
   <td>{{num_transient}}</td>
   <td>{{registration_id}}</td>
+  <td>{{secs_since_heartbeat}}</td>
 </tr>
 {{/subscribers}}
 


[3/7] impala git commit: Bump CDH_BUILD_NUMBER to 554247

Posted by kw...@apache.org.
Bump CDH_BUILD_NUMBER to 554247

This build includes the new HBase 2.1.0 snapshot as well as the new
Kudu CDH tarballs, which a following patch will add functionality for
downloading and building with.

Testing:
- Ran a full exhaustive build.

Change-Id: Ieae9581997367895be5f1f8c730968d375182d94
Reviewed-on: http://gerrit.cloudera.org:8080/11349
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/88485880
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/88485880
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/88485880

Branch: refs/heads/master
Commit: 88485880a5e709d78d3a51324448b0f11ba71009
Parents: 17bfac0
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Aug 28 19:53:35 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 29 20:41:42 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/88485880/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index facb7f4..a77683e 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -162,9 +162,9 @@ unset IMPALA_KUDU_URL
 : ${CDH_DOWNLOAD_HOST:=native-toolchain.s3.amazonaws.com}
 export CDH_DOWNLOAD_HOST
 export CDH_MAJOR_VERSION=6
-export CDH_BUILD_NUMBER=537982
+export CDH_BUILD_NUMBER=554247
 export IMPALA_HADOOP_VERSION=3.0.0-cdh6.x-SNAPSHOT
-export IMPALA_HBASE_VERSION=2.0.0-cdh6.x-SNAPSHOT
+export IMPALA_HBASE_VERSION=2.1.0-cdh6.x-SNAPSHOT
 export IMPALA_HIVE_VERSION=2.1.1-cdh6.x-SNAPSHOT
 export IMPALA_SENTRY_VERSION=2.0.0-cdh6.x-SNAPSHOT
 export IMPALA_PARQUET_VERSION=1.9.0-cdh6.x-SNAPSHOT


[6/7] impala git commit: IMPALA-7502: ALTER TABLE RENAME should require ALL on the old table

Posted by kw...@apache.org.
IMPALA-7502: ALTER TABLE RENAME should require ALL on the old table

Prior to this patch, ALTER TABLE/VIEW RENAME required ALTER on the old
table. This may pose a potential security risk, such as having ALTER on
a table and ALL on a particular database allows a user to move the table
to a database with ALL, which will automatically grant that user with
ALL privilege on that table due to the privilege inherited from the
database. This patch fixes the issue by requring ALL on the old table.
What this means is moving a table to a database with ALL privilege will
not elevate the privilege since ALL is now required for a table to be
renamed.

Testing:
- Ran all FE tests

Change-Id: I47a417a77df3f3030cf5f54fd2280b5e5e1fb77a
Reviewed-on: http://gerrit.cloudera.org:8080/11344
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b97e5ba3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b97e5ba3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b97e5ba3

Branch: refs/heads/master
Commit: b97e5ba38dcb8128efc18dbc995307386b8d2210
Parents: e8210ab
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Tue Aug 28 14:07:16 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 30 01:11:08 2018 +0000

----------------------------------------------------------------------
 .../analysis/AlterTableOrViewRenameStmt.java    |  2 +-
 .../apache/impala/analysis/AuditingTest.java    | 12 ++-
 .../impala/analysis/AuthorizationStmtTest.java  | 86 +++++++++++---------
 3 files changed, 57 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b97e5ba3/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewRenameStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewRenameStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewRenameStmt.java
index 4cf2ff3..1ba4ae5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewRenameStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewRenameStmt.java
@@ -70,7 +70,7 @@ public class AlterTableOrViewRenameStmt extends AlterTableStmt {
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     newTableName_.analyze();
-    table_ = analyzer.getTable(tableName_, Privilege.ALTER);
+    table_ = analyzer.getTable(tableName_, Privilege.ALL);
     if (table_ instanceof FeView && renameTable_) {
       throw new AnalysisException(String.format(
           "ALTER TABLE not allowed on a view: %s", table_.getFullName()));

http://git-wip-us.apache.org/repos/asf/impala/blob/b97e5ba3/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
index 7735d3f..944fadb 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuditingTest.java
@@ -259,18 +259,26 @@ public class AuditingTest extends FrontendTestBase {
         "ALTER TABLE functional_seq_snap.alltypes RENAME TO functional_seq_snap.t1");
     Assert.assertEquals(accessEvents, Sets.newHashSet(
         new TAccessEvent(
-            "functional_seq_snap.alltypes", TCatalogObjectType.TABLE, "ALTER"),
+            "functional_seq_snap.alltypes", TCatalogObjectType.TABLE, "ALL"),
         new TAccessEvent("functional_seq_snap.t1", TCatalogObjectType.TABLE, "CREATE")));
   }
 
   @Test
   public void TestAlterView() throws AnalysisException, AuthorizationException {
     Set<TAccessEvent> accessEvents = AnalyzeAccessEvents(
+        "ALTER VIEW functional_seq_snap.alltypes_view AS " +
+        "SELECT * FROM functional.alltypes");
+    Assert.assertEquals(accessEvents, Sets.newHashSet(
+        new TAccessEvent(
+            "functional_seq_snap.alltypes_view", TCatalogObjectType.VIEW, "ALTER"),
+        new TAccessEvent("functional.alltypes", TCatalogObjectType.TABLE, "SELECT")));
+
+    accessEvents = AnalyzeAccessEvents(
         "ALTER VIEW functional_seq_snap.alltypes_view " +
         "rename to functional_seq_snap.v1");
     Assert.assertEquals(accessEvents, Sets.newHashSet(
         new TAccessEvent(
-            "functional_seq_snap.alltypes_view", TCatalogObjectType.VIEW, "ALTER"),
+            "functional_seq_snap.alltypes_view", TCatalogObjectType.VIEW, "ALL"),
         new TAccessEvent("functional_seq_snap.v1", TCatalogObjectType.VIEW, "CREATE")));
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b97e5ba3/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index 1edb066..6d301d9 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -2005,29 +2005,32 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     }
 
     // Alter table rename.
-    authorize("alter table functional.alltypes rename to functional.new_table")
+    authorize("alter table functional.alltypes rename to functional_parquet.new_table")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
-        .ok(onServer(TPrivilegeLevel.ALTER, TPrivilegeLevel.CREATE))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-        .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALTER, TPrivilegeLevel.CREATE))
-        .ok(onDatabase("functional", TPrivilegeLevel.CREATE), onTable("functional",
-            "alltypes", TPrivilegeLevel.ALTER))
-        .error(alterError("functional.alltypes"))
-        .error(alterError("functional.alltypes"), onServer(allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER,
-            TPrivilegeLevel.CREATE)))
-        .error(alterError("functional.alltypes"), onDatabase("functional", allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER,
-            TPrivilegeLevel.CREATE)))
-        .error(alterError("functional.alltypes"), onDatabase("functional",
+        .ok(onDatabase("functional", TPrivilegeLevel.ALL),
+            onDatabase("functional_parquet", TPrivilegeLevel.CREATE))
+        .ok(onDatabase("functional_parquet", TPrivilegeLevel.CREATE),
+            onTable("functional", "alltypes", TPrivilegeLevel.ALL))
+        .ok(onDatabase("functional", TPrivilegeLevel.OWNER),
+            onDatabase("functional_parquet", TPrivilegeLevel.CREATE))
+        .ok(onDatabase("functional_parquet", TPrivilegeLevel.CREATE),
+            onTable("functional", "alltypes", TPrivilegeLevel.OWNER))
+        .error(accessError("functional.alltypes"))
+        .error(accessError("functional.alltypes"), onServer(allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)))
+        .error(accessError("functional.alltypes"), onDatabase("functional", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)), onDatabase("functional_parquet",
+            TPrivilegeLevel.CREATE))
+        .error(createError("functional_parquet"), onDatabase("functional",
+            TPrivilegeLevel.ALL), onDatabase("functional_parquet", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)))
+        .error(accessError("functional.alltypes"), onDatabase("functional",
             TPrivilegeLevel.CREATE), onTable("functional", "alltypes", allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)))
-        .error(createError("functional"), onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
-            TPrivilegeLevel.CREATE)),
-            onTable("functional", "alltypes", TPrivilegeLevel.ALTER));
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+        .error(createError("functional_parquet"), onDatabase("functional", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)),
+            onTable("functional", "alltypes", TPrivilegeLevel.ALL));
 
     // Only for Kudu tables.
     for (AuthzTest test: new AuthzTest[]{
@@ -2156,29 +2159,32 @@ public class AuthorizationStmtTest extends FrontendTestBase {
     }
 
     // Alter view rename.
-    authorize("alter view functional.alltypes_view rename to functional.new_view")
+    authorize("alter view functional.alltypes_view rename to functional_parquet.new_view")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.OWNER))
-        .ok(onServer(TPrivilegeLevel.ALTER, TPrivilegeLevel.CREATE))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-        .ok(onDatabase("functional", TPrivilegeLevel.OWNER))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALTER, TPrivilegeLevel.CREATE))
-        .ok(onDatabase("functional", TPrivilegeLevel.CREATE), onTable("functional",
-            "alltypes_view", TPrivilegeLevel.ALTER))
-        .error(alterError("functional.alltypes_view"))
-        .error(alterError("functional.alltypes_view"), onServer(allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER,
-            TPrivilegeLevel.CREATE)))
-        .error(alterError("functional.alltypes_view"), onDatabase("functional", allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER,
-            TPrivilegeLevel.CREATE)))
-        .error(alterError("functional.alltypes_view"), onDatabase("functional",
+        .ok(onDatabase("functional", TPrivilegeLevel.ALL),
+            onDatabase("functional_parquet", TPrivilegeLevel.CREATE))
+        .ok(onDatabase("functional_parquet", TPrivilegeLevel.CREATE),
+            onTable("functional", "alltypes_view", TPrivilegeLevel.ALL))
+        .ok(onDatabase("functional", TPrivilegeLevel.OWNER),
+            onDatabase("functional_parquet", TPrivilegeLevel.CREATE))
+        .ok(onDatabase("functional_parquet", TPrivilegeLevel.CREATE),
+            onTable("functional", "alltypes_view", TPrivilegeLevel.OWNER))
+        .error(accessError("functional.alltypes_view"))
+        .error(accessError("functional.alltypes_view"), onServer(allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)))
+        .error(accessError("functional.alltypes_view"), onDatabase("functional",
+            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)),
+            onDatabase("functional_parquet", TPrivilegeLevel.CREATE))
+        .error(createError("functional_parquet"), onDatabase("functional",
+            TPrivilegeLevel.ALL), onDatabase("functional_parquet", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)))
+        .error(accessError("functional.alltypes_view"), onDatabase("functional",
             TPrivilegeLevel.CREATE), onTable("functional", "alltypes_view", allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.ALTER)))
-        .error(createError("functional"), onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER,
-            TPrivilegeLevel.CREATE)), onTable("functional", "alltypes_view",
-            TPrivilegeLevel.ALTER));
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)))
+        .error(createError("functional_parquet"), onDatabase("functional", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.CREATE)),
+            onTable("functional", "alltypes_view", TPrivilegeLevel.ALL));
 
     // Alter view with constant select.
     authorize("alter view functional.alltypes_view as select 1")


[5/7] impala git commit: Bump FE pom to Java 8 source/target version

Posted by kw...@apache.org.
Bump FE pom to Java 8 source/target version

Our dependency on Hadoop 3 means we already required Java 8. This just
fixes the pom so we are compiling to Java 8 classes. This will also
allow us to start using some Java 8 features like lambdas for more
concise code.

Change-Id: I0a5e4cf3f4171eecf218f6d4dd7cdfece9dc9152
Reviewed-on: http://gerrit.cloudera.org:8080/11351
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e8210ab2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e8210ab2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e8210ab2

Branch: refs/heads/master
Commit: e8210ab201f1fad874a4b81b418ad36ac71add41
Parents: 8692bfb
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Aug 29 11:57:36 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Aug 29 23:10:45 2018 +0000

----------------------------------------------------------------------
 common/yarn-extras/pom.xml      | 4 ++--
 ext-data-source/api/pom.xml     | 4 ++--
 ext-data-source/sample/pom.xml  | 4 ++--
 ext-data-source/test/pom.xml    | 4 ++--
 fe/pom.xml                      | 4 ++--
 testdata/TableFlattener/pom.xml | 4 ++--
 testdata/pom.xml                | 4 ++--
 tests/test-hive-udfs/pom.xml    | 4 ++--
 8 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/common/yarn-extras/pom.xml
----------------------------------------------------------------------
diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml
index 74a5fee..48a609a 100644
--- a/common/yarn-extras/pom.xml
+++ b/common/yarn-extras/pom.xml
@@ -83,8 +83,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/ext-data-source/api/pom.xml
----------------------------------------------------------------------
diff --git a/ext-data-source/api/pom.xml b/ext-data-source/api/pom.xml
index 15bfd55..82e6a88 100644
--- a/ext-data-source/api/pom.xml
+++ b/ext-data-source/api/pom.xml
@@ -61,8 +61,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/ext-data-source/sample/pom.xml
----------------------------------------------------------------------
diff --git a/ext-data-source/sample/pom.xml b/ext-data-source/sample/pom.xml
index c935549..c0df755 100644
--- a/ext-data-source/sample/pom.xml
+++ b/ext-data-source/sample/pom.xml
@@ -66,8 +66,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/ext-data-source/test/pom.xml
----------------------------------------------------------------------
diff --git a/ext-data-source/test/pom.xml b/ext-data-source/test/pom.xml
index b8e2601..c96c86a 100644
--- a/ext-data-source/test/pom.xml
+++ b/ext-data-source/test/pom.xml
@@ -57,8 +57,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 896dce5..5199d4f 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -514,8 +514,8 @@ under the License.
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/testdata/TableFlattener/pom.xml
----------------------------------------------------------------------
diff --git a/testdata/TableFlattener/pom.xml b/testdata/TableFlattener/pom.xml
index 527175b..cff5465 100644
--- a/testdata/TableFlattener/pom.xml
+++ b/testdata/TableFlattener/pom.xml
@@ -39,8 +39,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/testdata/pom.xml
----------------------------------------------------------------------
diff --git a/testdata/pom.xml b/testdata/pom.xml
index 863704e..74ce872 100644
--- a/testdata/pom.xml
+++ b/testdata/pom.xml
@@ -170,8 +170,8 @@ under the License.
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e8210ab2/tests/test-hive-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/tests/test-hive-udfs/pom.xml b/tests/test-hive-udfs/pom.xml
index 30b31b5..eb3ff82 100644
--- a/tests/test-hive-udfs/pom.xml
+++ b/tests/test-hive-udfs/pom.xml
@@ -84,8 +84,8 @@ under the License.
         <artifactId>maven-compiler-plugin</artifactId>
         <version>3.3</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
         </configuration>
       </plugin>
     </plugins>


[2/7] impala git commit: IMPALA-7500: [DOCS] Clarify the workaround for IMPALA-635

Posted by kw...@apache.org.
IMPALA-7500: [DOCS] Clarify the workaround for IMPALA-635

Change-Id: I6ce1fab79a8e77c3a9060d1363fc093a261d4e84
Reviewed-on: http://gerrit.cloudera.org:8080/11346
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Alex Rodoni <ar...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/17bfac00
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/17bfac00
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/17bfac00

Branch: refs/heads/master
Commit: 17bfac00f79219d1cd06be436f6f78b974c6811b
Parents: 5474230
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue Aug 28 16:01:27 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Aug 29 00:11:22 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_known_issues.xml | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/17bfac00/docs/topics/impala_known_issues.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_known_issues.xml b/docs/topics/impala_known_issues.xml
index 662132a..b7c439d 100644
--- a/docs/topics/impala_known_issues.xml
+++ b/docs/topics/impala_known_issues.xml
@@ -617,8 +617,10 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
       <conbody>
 
         <p>
-          Querying certain Avro tables could cause a crash or return no rows, even though Impala
-          could <codeph>DESCRIBE</codeph> the table.
+          The default value in Avro schema must match type of first union
+          type, e.g. if the default value is <codeph>null</codeph>, then the
+          first type in the <codeph>UNION</codeph> must be
+            <codeph>"null"</codeph>.
         </p>
 
         <p>
@@ -626,14 +628,11 @@ ALTER TABLE table_name SET TBLPROPERTIES('EXTERNAL'='TRUE');
         </p>
 
         <p>
-          <b>Workaround:</b> Swap the order of the fields in the schema specification. For
-          example, <codeph>["null", "string"]</codeph> instead of <codeph>["string",
-          "null"]</codeph>.
-        </p>
-
-        <p>
-          <b>Resolution:</b> Not allowing this syntax agrees with the Avro specification, so it
-          may still cause an error even when the crashing issue is resolved.
+          <b>Workaround:</b>Swap the order of the fields in the schema
+          specification. For example, use <codeph>["null", "string"]</codeph>
+          instead of <codeph>["string", "null"]</codeph>. Note that the files
+          written with the problematic schema must be rewritten with the new
+          schema because Avro files have embedded schemas.
         </p>
 
       </conbody>