You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/13 02:24:29 UTC

[impala] 05/08: IMPALA-10522: Support external use of frontend libraries

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b3d95d8f32c33c60657d6f518fe87a624509a618
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Thu Mar 26 10:35:36 2020 -0500

    IMPALA-10522: Support external use of frontend libraries
    
    This patch enables the Impala frontend jar and dependent library
    libfesupport.so to be used by an external Java frontend.
    
    Calling FeSupport.setExternalFE() will cause external frontend
    initialization mode to be used during FeSupport.loadLibrary(). This
    mode builds upon logic that is used to initialize the frontend jar for
    unit tests.
    
    Initialization in external frontend mode differs as follows:
    
    - Skip instantiating Frontend object and it's dependents
    - Skip loading libhdfs
    - Skip starting JVM Pause monitor
    - Disable Minidumper
    - Initialize TimezoneDatabase for external frontends
    - Disable redirect of stderr/stdout to libfesupport.so glog
    - Log messages from libfesupport.so to stderr
    - Use libfesupport.so for JNI symbol look up
    
    Null check were added in places where objects were assumed to be
    instantiated but are now skipped during initialization.
    
    Additional change:
    1) Add libfesupport.lib path to JAVA_LIBRARY_PATH in test driver
    
    Testing: - Initialized frontend jar from external frontend
     - Verified that frontend Java objects can be used externally without
       issues
     - Verified that exceptions thrown from Impala Java or libfesupport
       can be caught or propagated correctly by the external frontend
     - Manual verification of minicluster logs
     - Ran queries with external frontend
    
    Co-authored-by: John Sherman <jf...@cloudera.com>
    Co-authored-by: Aman Sinha <am...@cloudera.com>
    
    Change-Id: I4e3a84721ba196ec00773ce2923b19610b90edd9
    Reviewed-on: http://gerrit.cloudera.org:8080/17115
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Thomas Tauber-Marshall <tm...@cloudera.com>
---
 be/src/benchmarks/expr-benchmark.cc                |  2 +-
 be/src/common/init.cc                              | 33 ++++++++++++++++++---
 be/src/common/init.h                               |  4 ++-
 be/src/runtime/data-stream-test.cc                 |  2 +-
 be/src/runtime/exec-env.cc                         | 34 ++++++++++++----------
 be/src/runtime/exec-env.h                          | 15 +++++++---
 be/src/runtime/lib-cache.cc                        | 11 ++++---
 be/src/runtime/lib-cache.h                         |  4 +--
 be/src/service/fe-support.cc                       | 17 ++++++-----
 be/src/util/jni-util.cc                            |  2 --
 .../java/org/apache/impala/service/FeSupport.java  | 18 +++++++++---
 testdata/bin/run-hive-server.sh                    |  3 ++
 12 files changed, 99 insertions(+), 46 deletions(-)

diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 689295f..f915096 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -67,7 +67,7 @@ class Planner {
  public:
   Planner() {
     frontend_.SetCatalogIsReady();
-    ABORT_IF_ERROR(exec_env_.InitForFeTests());
+    ABORT_IF_ERROR(exec_env_.InitForFeSupport());
   }
 
   ~Planner() {
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 452a0af..3b67247 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -27,6 +27,7 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/timezone_db.h"
 #include "gutil/atomicops.h"
 #include "gutil/strings/substitute.h"
 #include "rpc/authentication.h"
@@ -72,6 +73,7 @@ DECLARE_int32(logbufsecs);
 DECLARE_int32(max_log_files);
 DECLARE_int32(max_minidumps);
 DECLARE_string(redaction_rules_file);
+DECLARE_bool(redirect_stdout_stderr);
 DECLARE_string(reserved_words_version);
 DECLARE_bool(symbolize_stacktrace);
 
@@ -252,7 +254,7 @@ void BlockImpalaShutdownSignal() {
 }
 
 void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
-    TestInfo::Mode test_mode) {
+    TestInfo::Mode test_mode, bool external_fe) {
   srand(time(NULL));
   BlockImpalaShutdownSignal();
 
@@ -275,6 +277,16 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 # else
   FLAGS_symbolize_stacktrace = true;
 #endif
+
+  if (external_fe) {
+    // Change defaults for flags when loaded as part of external frontend.
+    // Write logs to stderr by default (otherwise logs get written to
+    // FeSupport.INFO/ERROR).
+    FLAGS_logtostderr = true;
+    // Do not redirct stdout/stderr by default.
+    FLAGS_redirect_stdout_stderr = false;
+  }
+
   google::SetVersionString(impala::GetBuildVersion());
   google::ParseCommandLineFlags(&argc, &argv, true);
   if (!FLAGS_redaction_rules_file.empty()) {
@@ -298,7 +310,9 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   }
   impala::InitGoogleLoggingSafe(argv[0]);
   // Breakpad needs flags and logging to initialize.
-  ABORT_IF_ERROR(RegisterMinidump(argv[0]));
+  if (!external_fe) {
+    ABORT_IF_ERROR(RegisterMinidump(argv[0]));
+  }
 #ifndef THREAD_SANITIZER
 #ifndef __aarch64__
   AtomicOps_x86CPUFeaturesInit();
@@ -360,14 +374,19 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   }
 
   // Required for the FE's Catalog
-  ABORT_IF_ERROR(impala::LibCache::Init());
+  ABORT_IF_ERROR(impala::LibCache::Init(external_fe));
   Status fs_cache_init_status = impala::HdfsFsCache::Init();
   if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail());
 
   if (init_jvm) {
+    if (!external_fe) {
+      JniUtil::InitLibhdfs();
+    }
     ABORT_IF_ERROR(JniUtil::Init());
     InitJvmLoggingSupport();
-    ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor());
+    if (!external_fe) {
+      ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor());
+    }
     ZipUtil::InitJvm();
   }
 
@@ -398,6 +417,12 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
     error_msg << "Failed to register action for SIGTERM: " << GetStrErrMsg();
     CLEAN_EXIT_WITH_ERROR(error_msg.str());
   }
+
+  if (external_fe) {
+    // Explicitly load the timezone database for external FEs. Impala daemons load it
+    // through ImpaladMain
+    ABORT_IF_ERROR(TimezoneDatabase::Initialize());
+  }
 }
 
 Status impala::StartMemoryMaintenanceThread() {
diff --git a/be/src/common/init.h b/be/src/common/init.h
index ff0194e..e55c32b 100644
--- a/be/src/common/init.h
+++ b/be/src/common/init.h
@@ -32,8 +32,10 @@ namespace impala {
 /// logging directory, and enable special test-specific behavior.
 /// Callers that want to override default gflags variables should do so before calling
 /// this method. No logging should be performed until after this method returns.
+/// Passing external_fe=true causes specific initialization steps to be skipped
+/// that an external frontend will have already performed.
 void InitCommonRuntime(int argc, char** argv, bool init_jvm,
-    TestInfo::Mode m = TestInfo::NON_TEST);
+    TestInfo::Mode m = TestInfo::NON_TEST, bool external_fe = false);
 
 /// Starts background memory maintenance thread. Must be called after
 /// RegisterMemoryMetrics(). This thread is needed for daemons to free memory and
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 21c273f..a8dd51a 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -151,7 +151,7 @@ class DataStreamTest : public testing::Test {
 
   virtual void SetUp() {
     exec_env_.reset(new ExecEnv());
-    ABORT_IF_ERROR(exec_env_->InitForFeTests());
+    ABORT_IF_ERROR(exec_env_->InitForFeSupport());
     exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
     runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
     TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new TPlanFragment());
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fe06948..c3c8f06 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -229,12 +229,12 @@ struct ExecEnv::KuduClientPtr {
 
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
-ExecEnv::ExecEnv()
+ExecEnv::ExecEnv(bool external_fe)
   : ExecEnv(FLAGS_krpc_port, FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
-        FLAGS_state_store_host, FLAGS_state_store_port) {}
+        FLAGS_state_store_host, FLAGS_state_store_port, external_fe) {}
 
 ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
-    const string& statestore_host, int statestore_port)
+    const string& statestore_host, int statestore_port, bool external_fe)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
     // Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
@@ -250,11 +250,12 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
     pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),
     tmp_file_mgr_(new TmpFileMgr),
-    frontend_(new Frontend()),
+    frontend_(external_fe ? nullptr : new Frontend()),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
     rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
+    external_fe_(external_fe),
     configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, krpc_port)) {
   UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id_);
 
@@ -321,7 +322,7 @@ ExecEnv::~ExecEnv() {
   disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
 }
 
-Status ExecEnv::InitForFeTests() {
+Status ExecEnv::InitForFeSupport() {
   mem_tracker_.reset(new MemTracker(-1, "Process"));
   is_fe_tests_ = true;
   return Status::OK();
@@ -469,7 +470,7 @@ Status ExecEnv::Init() {
   }
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
-  if (FLAGS_is_coordinator) {
+  if (FLAGS_is_coordinator && frontend_ != nullptr) {
     cluster_membership_mgr_->RegisterUpdateCallbackFn(
         [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
           SendClusterMembershipToFrontend(snapshot, this->frontend());
@@ -482,15 +483,18 @@ Status ExecEnv::Init() {
 }
 
 Status ExecEnv::InitHadoopConfig() {
-  // Get the fs.defaultFS value set in core-site.xml and assign it to configured_defaultFs
-  TGetHadoopConfigRequest config_request;
-  config_request.__set_name(DEFAULT_FS);
-  TGetHadoopConfigResponse config_response;
-  RETURN_IF_ERROR(frontend_->GetHadoopConfig(config_request, &config_response));
-  if (config_response.__isset.value) {
-    default_fs_ = config_response.value;
-  } else {
-    default_fs_ = "hdfs://";
+  if (frontend_ != nullptr) {
+    // Get the fs.defaultFS value set in core-site.xml and assign it to
+    // configured_defaultFs
+    TGetHadoopConfigRequest config_request;
+    config_request.__set_name(DEFAULT_FS);
+    TGetHadoopConfigResponse config_response;
+    RETURN_IF_ERROR(frontend_->GetHadoopConfig(config_request, &config_response));
+    if (config_response.__isset.value) {
+      default_fs_ = config_response.value;
+    } else {
+      default_fs_ = "hdfs://";
+    }
   }
   return Status::OK();
 }
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index e378d5b..192b7a4 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -81,10 +81,13 @@ namespace io {
 /// ExecEnv::GetInstance().
 class ExecEnv {
  public:
-  ExecEnv();
+  /// If external_fe = true, some members (i.e. frontend_) are not used and will
+  /// be initialized to null.
+  /// TODO: Split out common logic into base class and eliminate null pointers.
+  ExecEnv(bool external_fe = false);
 
   ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
-      const std::string& statestore_host, int statestore_port);
+      const std::string& statestore_host, int statestore_port, bool external_fe = false);
 
   /// Returns the most recently created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -128,7 +131,10 @@ class ExecEnv {
   HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); }
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
   ImpalaServer* impala_server() { return impala_server_; }
-  Frontend* frontend() { return frontend_.get(); }
+  Frontend* frontend() {
+    DCHECK(frontend_.get() != nullptr);
+    return frontend_.get();
+  }
   RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
@@ -154,7 +160,7 @@ class ExecEnv {
   const TNetworkAddress& krpc_address() const { return krpc_address_; }
 
   /// Initializes the exec env for running FE tests.
-  Status InitForFeTests() WARN_UNUSED_RESULT;
+  Status InitForFeSupport() WARN_UNUSED_RESULT;
 
   /// Returns true if this environment was created from the FE tests. This makes the
   /// environment special since the JVM is started first and libraries are loaded
@@ -229,6 +235,7 @@ class ExecEnv {
   MetricGroup* rpc_metrics_ = nullptr;
 
   bool enable_webserver_;
+  bool external_fe_;
 
  private:
   friend class TestEnv;
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index c8084eb..838cef7 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -102,14 +102,17 @@ LibCache::~LibCache() {
   if (current_process_handle_ != nullptr) DynamicClose(current_process_handle_);
 }
 
-Status LibCache::Init() {
+Status LibCache::Init(bool external_fe) {
   DCHECK(LibCache::instance_.get() == nullptr);
   LibCache::instance_.reset(new LibCache());
-  return LibCache::instance_->InitInternal();
+  return LibCache::instance_->InitInternal(external_fe);
 }
 
-Status LibCache::InitInternal() {
-  if (TestInfo::is_fe_test()) {
+Status LibCache::InitInternal(bool external_fe) {
+  if (external_fe) {
+    LOG(INFO) << "Library cache is using shared object for process handle";
+    RETURN_IF_ERROR(DynamicOpen("libfesupport.so", &current_process_handle_));
+  } else if (TestInfo::is_fe_test()) {
     // In the FE tests, nullptr gives the handle to the java process.
     // Explicitly load the fe-support shared object.
     string fe_support_path;
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 69a6c77..5f09a9e 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -72,7 +72,7 @@ class LibCache {
   ~LibCache();
 
   /// Initializes the libcache. Must be called before any other APIs.
-  static Status Init();
+  static Status Init(bool external_fe);
 
   /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'. If
   /// this file is not already on the local fs, or if the cached entry's last modified
@@ -156,7 +156,7 @@ class LibCache {
   LibCache(LibCache const& l); // disable copy ctor
   LibCache& operator=(LibCache const& l); // disable assignment
 
-  Status InitInternal();
+  Status InitInternal(bool external_fe);
 
   /// Returns the cache entry for 'hdfs_lib_file'. If this library has not been
   /// copied locally, it will copy it and add a new LibCacheEntry to 'lib_cache_'.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 0a25997..c0eb3f0 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -63,23 +63,24 @@ using namespace apache::thrift::server;
 
 static bool fe_support_disable_codegen = true;
 
-// Called from the FE when it explicitly loads libfesupport.so for tests.
+// Called from tests or external FE after it explicitly loads libfesupport.so.
 // This creates the minimal state necessary to service the other JNI calls.
 // This is not called when we first start up the BE.
 extern "C"
 JNIEXPORT void JNICALL
-Java_org_apache_impala_service_FeSupport_NativeFeTestInit(
-    JNIEnv* env, jclass fe_support_class) {
+Java_org_apache_impala_service_FeSupport_NativeFeInit(
+    JNIEnv* env, jclass fe_support_class, bool external_fe) {
   DCHECK(ExecEnv::GetInstance() == NULL) << "This should only be called once from the FE";
   char* env_logs_dir_str = std::getenv("IMPALA_FE_TEST_LOGS_DIR");
   if (env_logs_dir_str != nullptr) FLAGS_log_dir = env_logs_dir_str;
   char* name = const_cast<char*>("FeSupport");
   // Init the JVM to load the classes in JniUtil that are needed for returning
   // exceptions to the FE.
-  InitCommonRuntime(1, &name, true, TestInfo::FE_TEST);
+  InitCommonRuntime(1, &name, true,
+      external_fe ? TestInfo::NON_TEST : TestInfo::FE_TEST, external_fe);
   THROW_IF_ERROR(LlvmCodeGen::InitializeLlvm(true), env, JniUtil::internal_exc_class());
-  ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process.
-  THROW_IF_ERROR(exec_env->InitForFeTests(), env, JniUtil::internal_exc_class());
+  ExecEnv* exec_env = new ExecEnv(external_fe); // This also caches it from the process.
+  THROW_IF_ERROR(exec_env->InitForFeSupport(), env, JniUtil::internal_exc_class());
 }
 
 // Serializes expression value 'value' to thrift structure TColumnValue 'col_val'.
@@ -692,8 +693,8 @@ namespace impala {
 
 static JNINativeMethod native_methods[] = {
   {
-      const_cast<char*>("NativeFeTestInit"), const_cast<char*>("()V"),
-      (void*)::Java_org_apache_impala_service_FeSupport_NativeFeTestInit
+      const_cast<char*>("NativeFeInit"), const_cast<char*>("(Z)V"),
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeFeInit
   },
   {
       const_cast<char*>("NativeEvalExprsWithoutRow"), const_cast<char*>("([B[BJ)[B"),
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 62df916..8792f78 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -129,8 +129,6 @@ Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global
 }
 
 Status JniUtil::Init() {
-  InitLibhdfs();
-
   // Get the JNIEnv* corresponding to current thread.
   JNIEnv* env = JniUtil::GetJNIEnv();
   if (env == NULL) return Status("Failed to get/create JVM");
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index bbca6bd..0fabc12 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -69,10 +69,11 @@ import com.google.common.base.Preconditions;
 public class FeSupport {
   private final static Logger LOG = LoggerFactory.getLogger(FeSupport.class);
   private static boolean loaded_ = false;
+  private static boolean externalFE_ = false;
 
-  // Only called if this library is explicitly loaded. This only happens
-  // when running FE tests.
-  public native static void NativeFeTestInit();
+  // Only called if this library is explicitly loaded. This happens
+  // when running FE tests or external FE
+  public native static void NativeFeInit(boolean externalFE);
 
   // Returns a serialized TResultRow
   public native static byte[] NativeEvalExprsWithoutRow(
@@ -471,6 +472,15 @@ public class FeSupport {
   }
 
   /**
+   * Calling this function before loadLibrary() causes external frontend
+   * initialization to be used during NativeFeInit()
+   */
+  public static synchronized void setExternalFE() {
+    Preconditions.checkState(!loaded_);
+    externalFE_ = true;
+  }
+
+  /**
    * This function should be called explicitly by the FeSupport to ensure that
    * native functions are loaded. Tests that depend on JniCatalog or JniFrontend
    * being instantiated should also call this function.
@@ -481,6 +491,6 @@ public class FeSupport {
     NativeLibUtil.loadLibrary("libfesupport.so");
     LOG.info("Loaded libfesupport.so");
     loaded_ = true;
-    NativeFeTestInit();
+    NativeFeInit(externalFE_);
   }
 }
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index fda2ce1..25986c5 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -120,6 +120,9 @@ HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
 # Wait for the Metastore to come up because HiveServer2 relies on it being live.
 ${CLUSTER_BIN}/wait-for-metastore.py --transport=${METASTORE_TRANSPORT}
 
+# Include the latest libfesupport.so in the JAVA_LIBRARY_PATH
+export JAVA_LIBRARY_PATH="${JAVA_LIBRARY_PATH-}:${IMPALA_HOME}/be/build/latest/service/"
+
 if [ ${ONLY_METASTORE} -eq 0 ]; then
   # Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT
   # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM