You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/09 21:48:11 UTC
[5/9] impala git commit: IMPALA-6857: Add Jvm pause/GC Monitor
utility and expose JMX metrics
IMPALA-6857: Add Jvm pause/GC Monitor utility and expose JMX metrics
Pause monitor:
=============
This commit adds a stripped down version of Hadoop's JvmPauseMonitor
class (https://bit.ly/2O6qSwm) . The core implementaion is borrowed
from hadoop-common project and the hadoop dependencies are removed.
- Removed dependency on AbstractService.
- Not relying on Hadoop's Configuration object for reading confs.
- Switched to Guava's implementation of Stopwatch.
This utility class can detect both GC/non-GC pauses. In case of GC
pauses, the GC metrics during the pause period are logged.
Sample Output:
=============
Detected pause in JVM or host machine (eg GC): pause of approximately
2356ms
GC pool 'PS MarkSweep' had collection(s): count=1 time=2241ms
GC pool 'PS Scavenge' had collection(s): count=3 time=352ms
Detected pause in JVM or host machine (eg GC): pause of approximately
1964ms
GC pool 'PS MarkSweep' had collection(s): count=1 time=2082ms
GC pool 'PS Scavenge' had collection(s): count=1 time=251ms
Detected pause in JVM or host machine (eg GC): pause of approximately
2120ms
GC pool 'PS MarkSweep' had collection(s): count=1 time=2454ms
Detected pause in JVM or host machine (eg GC): pause of approximately
2238ms
GC pool 'PS MarkSweep' had collection(s): count=5 time=13464ms
Detected pause in JVM or host machine (eg GC): pause of approximately
2233ms
GC pool 'PS MarkSweep' had collection(s): count=1 time=2733ms
JMX Metrics:
============
JMX metrics are now emmitted for Impala and Catalog JVMs at the web end
point /jmx.
- Impalad: http(s)://<impalad-host>:25000/jmx
- Catalogd: http(s)://<catalogd-host>:25020/jmx
Misc:
====
Renamed JvmMetric -> JvmMemoryMetric to make the intent more clear. It
doesn't relate to the functionality of the patch in anyway.
Testing:
=======
- Tested it manually with kill -SIGSTOP/-SIGCONT <pid>. Made sure that
the non-GC JVM pauses are logged.
- This class' functionality is tested manually by invoking it's main()
- Injected a memory leak into the Catalog server code and made sure the
GC is detected.
Change-Id: I30d897b7e063846ad6d8f88243e2c04264da0341
Reviewed-on: http://gerrit.cloudera.org:8080/10998
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4976ff3c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4976ff3c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4976ff3c
Branch: refs/heads/master
Commit: 4976ff3c07f465915ac31312ca67519a600212e6
Parents: 5cb956e
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Jul 19 16:01:02 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 9 04:33:59 2018 +0000
----------------------------------------------------------------------
be/src/common/init.cc | 1 +
be/src/service/impala-http-handler.cc | 4 +-
be/src/util/default-path-handlers.cc | 40 ++-
be/src/util/jni-util.cc | 32 ++-
be/src/util/jni-util.h | 20 +-
be/src/util/memory-metrics.cc | 62 ++--
be/src/util/memory-metrics.h | 27 +-
be/src/util/metrics-test.cc | 4 +-
be/src/util/webserver-test.cc | 4 +-
be/src/util/webserver.cc | 13 +-
be/src/util/webserver.h | 10 +-
common/thrift/Frontend.thrift | 11 +-
.../java/org/apache/impala/common/JniUtil.java | 57 ++--
.../org/apache/impala/util/JMXJsonUtil.java | 281 +++++++++++++++++++
.../org/apache/impala/util/JvmPauseMonitor.java | 205 ++++++++++++++
.../org/apache/impala/util/JMXJsonUtilTest.java | 56 ++++
.../org/apache/impala/util/JniUtilTest.java | 54 ++++
tests/custom_cluster/test_pause_monitor.py | 38 +++
tests/webserver/test_web_pages.py | 21 +-
19 files changed, 850 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 2842c39..bc1065b 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -261,6 +261,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
if (init_jvm) {
ABORT_IF_ERROR(JniUtil::Init());
InitJvmLoggingSupport();
+ ABORT_IF_ERROR(JniUtil::InitJvmPauseMonitor());
ZipUtil::InitJvm();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index f7ca5d6..ea41084 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -240,7 +240,7 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap&
ss.str(Substitute("Could not obtain runtime profile: $0", status.GetDetail()));
}
}
- document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
+ document->AddMember(Webserver::ENABLE_RAW_HTML_KEY, true, document->GetAllocator());
Value profile(ss.str().c_str(), document->GetAllocator());
document->AddMember("contents", profile, document->GetAllocator());
}
@@ -252,7 +252,7 @@ void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& ar
[&](const std::shared_ptr<ClientRequestState>& request_state) {
ss << PrintId(request_state->query_id()) << "\n";
});
- document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
+ document->AddMember(Webserver::ENABLE_RAW_HTML_KEY, true, document->GetAllocator());
Value query_ids(ss.str().c_str(), document->GetAllocator());
document->AddMember("contents", query_ids, document->GetAllocator());
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 10966b4..9a50a79 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -26,6 +26,7 @@
#include <gutil/strings/substitute.h>
#include "common/logging.h"
+#include "rpc/jni-thrift-util.h"
#include "runtime/mem-tracker.h"
#include "runtime/exec-env.h"
#include "service/impala-server.h"
@@ -37,6 +38,7 @@
#include "util/cpu-info.h"
#include "util/disk-info.h"
#include "util/process-state-info.h"
+#include "util/jni-util.h"
#include "common/names.h"
@@ -192,6 +194,34 @@ void MemUsageHandler(MemTracker* mem_tracker, MetricGroup* metric_group,
}
}
+void JmxHandler(const Webserver::ArgumentMap& args, Document* document) {
+ document->AddMember(Webserver::ENABLE_PLAIN_JSON_KEY, true, document->GetAllocator());
+ TGetJMXJsonResponse result;
+ Status status = JniUtil::GetJMXJson(&result);
+ if (!status.ok()) {
+ Value error(status.GetDetail().c_str(), document->GetAllocator());
+ document->AddMember("error", error, document->GetAllocator());
+ VLOG(1) << "Error fetching JMX metrics: " << status.GetDetail();
+ return;
+ }
+ // Parse the JSON string returned from fe. We do an additional round of
+ // parsing to populate the JSON structure in the 'document' for our template
+ // rendering to work correctly. Otherwise the whole JSON content is considered
+ // as a single string mapped to another key.
+ Document doc(&document->GetAllocator());
+ doc.Parse<kParseDefaultFlags>(result.jmx_json.c_str());
+ if (doc.HasParseError()) {
+ Value error(doc.GetParseError(), document->GetAllocator());
+ document->AddMember("error", error, document->GetAllocator());
+ VLOG(1) << "Error fetching JMX metrics: " << doc.GetParseError();
+ return;
+ }
+ // Populate the members in the document.
+ for (Value::MemberIterator it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) {
+ document->AddMember(it->name.GetString(), it->value, document->GetAllocator());
+ }
+}
+
namespace impala {
void RootHandler(const Webserver::ArgumentMap& args, Document* document) {
@@ -226,10 +256,16 @@ void RootHandler(const Webserver::ArgumentMap& args, Document* document) {
document->GetAllocator());
}
-void AddDefaultUrlCallbacks(
- Webserver* webserver, MemTracker* process_mem_tracker, MetricGroup* metric_group) {
+void AddDefaultUrlCallbacks(Webserver* webserver, MemTracker* process_mem_tracker,
+ MetricGroup* metric_group) {
webserver->RegisterUrlCallback("/logs", "logs.tmpl", LogsHandler);
webserver->RegisterUrlCallback("/varz", "flags.tmpl", FlagsHandler);
+ if (JniUtil::is_jvm_inited()) {
+ // JmxHandler outputs a plain JSON string and does not require a template to
+ // render. However RawUrlCallback only supports PLAIN content type.
+ // (TODO): Switch to RawUrlCallback when it supports JSON content-type.
+ webserver->RegisterUrlCallback("/jmx", "raw_text.tmpl", JmxHandler);
+ }
if (process_mem_tracker != NULL) {
auto callback = [process_mem_tracker, metric_group]
(const Webserver::ArgumentMap& args, Document* doc) {
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/jni-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index db7893b..f47e25d 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -64,10 +64,12 @@ bool JniScopedArrayCritical::Create(JNIEnv* env, jbyteArray jarr,
return true;
}
+bool JniUtil::jvm_inited_ = false;
jclass JniUtil::jni_util_cl_ = NULL;
jclass JniUtil::internal_exc_cl_ = NULL;
jmethodID JniUtil::get_jvm_metrics_id_ = NULL;
jmethodID JniUtil::get_jvm_threads_id_ = NULL;
+jmethodID JniUtil::get_jmx_json_ = NULL;
jmethodID JniUtil::throwable_to_string_id_ = NULL;
jmethodID JniUtil::throwable_to_stack_trace_id_ = NULL;
@@ -176,10 +178,10 @@ Status JniUtil::Init() {
}
get_jvm_metrics_id_ =
- env->GetStaticMethodID(jni_util_cl_, "getJvmMetrics", "([B)[B");
+ env->GetStaticMethodID(jni_util_cl_, "getJvmMemoryMetrics", "([B)[B");
if (get_jvm_metrics_id_ == NULL) {
if (env->ExceptionOccurred()) env->ExceptionDescribe();
- return Status("Failed to find JniUtil.getJvmMetrics method.");
+ return Status("Failed to find JniUtil.getJvmMemoryMetrics method.");
}
get_jvm_threads_id_ =
@@ -189,6 +191,13 @@ Status JniUtil::Init() {
return Status("Failed to find JniUtil.getJvmThreadsInfo method.");
}
+ get_jmx_json_ =
+ env->GetStaticMethodID(jni_util_cl_, "getJMXJson", "()[B");
+ if (get_jmx_json_ == NULL) {
+ if (env->ExceptionOccurred()) env->ExceptionDescribe();
+ return Status("Failed to find JniUtil.getJMXJson method.");
+ }
+ jvm_inited_ = true;
return Status::OK();
}
@@ -199,6 +208,17 @@ void JniUtil::InitLibhdfs() {
hdfsDisconnect(fs);
}
+Status JniUtil::InitJvmPauseMonitor() {
+ JNIEnv* env = getJNIEnv();
+ if (!env) return Status("Failed to get/create JVM.");
+ if (!jni_util_cl_) return Status("JniUtil::Init() not called.");
+ jmethodID init_jvm_pm_method;
+ JniMethodDescriptor init_jvm_pm_desc = {"initPauseMonitor", "()V", &init_jvm_pm_method};
+ RETURN_IF_ERROR(JniUtil::LoadStaticJniMethod(env, jni_util_cl_, &init_jvm_pm_desc));
+ RETURN_IF_ERROR(JniUtil::CallJniMethod(jni_util_cl_, init_jvm_pm_method));
+ return Status::OK();
+}
+
Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& prefix) {
jthrowable exc = env->ExceptionOccurred();
if (exc == nullptr) return Status::OK();
@@ -234,8 +254,8 @@ Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& pr
return Status(Substitute("$0$1", prefix, msg_str_guard.get()));
}
-Status JniUtil::GetJvmMetrics(const TGetJvmMetricsRequest& request,
- TGetJvmMetricsResponse* result) {
+Status JniUtil::GetJvmMemoryMetrics(const TGetJvmMemoryMetricsRequest& request,
+ TGetJvmMemoryMetricsResponse* result) {
return JniUtil::CallJniMethod(jni_util_class(), get_jvm_metrics_id_, request, result);
}
@@ -244,6 +264,10 @@ Status JniUtil::GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request,
return JniUtil::CallJniMethod(jni_util_class(), get_jvm_threads_id_, request, result);
}
+Status JniUtil::GetJMXJson(TGetJMXJsonResponse* result) {
+ return JniUtil::CallJniMethod(jni_util_class(), get_jmx_json_, result);
+}
+
Status JniUtil::LoadJniMethod(JNIEnv* env, const jclass& jni_class,
JniMethodDescriptor* descriptor) {
(*descriptor->method_id) = env->GetMethodID(jni_class,
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index f0afb66..0176a13 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -219,6 +219,9 @@ class JniUtil {
/// Find JniUtil class, and get JniUtil.throwableToString method id
static Status Init() WARN_UNUSED_RESULT;
+ /// Initializes the JvmPauseMonitor.
+ static Status InitJvmPauseMonitor() WARN_UNUSED_RESULT;
+
/// Returns true if the given class could be found on the CLASSPATH in env.
/// Returns false otherwise, or if any other error occurred (e.g. a JNI exception).
/// This function does not log any errors or exceptions.
@@ -264,6 +267,9 @@ class JniUtil {
static jmethodID throwable_to_string_id() { return throwable_to_string_id_; }
static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; }
+ /// Returns true if an embedded JVM is initialized, false otherwise.
+ static bool is_jvm_inited() { return jvm_inited_; }
+
/// Global reference to java JniUtil class
static jclass jni_util_class() { return jni_util_cl_; }
@@ -278,14 +284,17 @@ class JniUtil {
/// Populates 'result' with a list of memory metrics from the Jvm. Returns Status::OK
/// unless there is an exception.
- static Status GetJvmMetrics(const TGetJvmMetricsRequest& request,
- TGetJvmMetricsResponse* result) WARN_UNUSED_RESULT;
+ static Status GetJvmMemoryMetrics(const TGetJvmMemoryMetricsRequest& request,
+ TGetJvmMemoryMetricsResponse* result) WARN_UNUSED_RESULT;
- // Populates 'result' with information about live JVM threads. Returns
- // Status::OK unless there is an exception.
+ /// Populates 'result' with information about live JVM threads. Returns
+ /// Status::OK unless there is an exception.
static Status GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request,
TGetJvmThreadsInfoResponse* result) WARN_UNUSED_RESULT;
+ /// Gets JMX metrics of the JVM encoded as a JSON string.
+ static Status GetJMXJson(TGetJMXJsonResponse* result) WARN_UNUSED_RESULT;
+
/// Loads a method whose signature is in the supplied descriptor. Returns Status::OK
/// and sets descriptor->method_id to a JNI method handle if successful, otherwise an
/// error status is returned.
@@ -367,12 +376,15 @@ class JniUtil {
}
private:
+ // Set in Init() once the JVM is initialized.
+ static bool jvm_inited_;
static jclass jni_util_cl_;
static jclass internal_exc_cl_;
static jmethodID throwable_to_string_id_;
static jmethodID throwable_to_stack_trace_id_;
static jmethodID get_jvm_metrics_id_;
static jmethodID get_jvm_threads_id_;
+ static jmethodID get_jmx_json_;
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 390bd7c..04b94ef 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -118,7 +118,7 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
new SumGauge(MetricDefs::Get("memory.total-used"), used_metrics));
if (register_jvm_metrics) {
- RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
+ RETURN_IF_ERROR(JvmMemoryMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
}
if (FLAGS_enable_extended_memory_metrics && MemInfo::HaveSmaps()) {
@@ -158,53 +158,57 @@ void AggregateMemoryMetrics::Refresh() {
THP_KHUGEPAGED_DEFRAG->SetValue(thp_config.khugepaged_defrag);
}
-JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
- const string& pool_name, JvmMetric::JvmMetricType type) {
+JvmMemoryMetric* JvmMemoryMetric::CreateAndRegister(
+ MetricGroup* metrics, const string& key, const string& pool_name,
+ JvmMemoryMetric::JvmMemoryMetricType type) {
string pool_name_for_key = pool_name;
to_lower(pool_name_for_key);
replace(pool_name_for_key.begin(), pool_name_for_key.end(), ' ', '-');
- return metrics->RegisterMetric(new JvmMetric(MetricDefs::Get(key, pool_name_for_key),
- pool_name, type));
+ return metrics->RegisterMetric(
+ new JvmMemoryMetric(MetricDefs::Get(key, pool_name_for_key), pool_name, type));
}
-JvmMetric::JvmMetric(const TMetricDef& def, const string& mempool_name,
- JvmMetricType type) : IntGauge(def, 0) {
+JvmMemoryMetric::JvmMemoryMetric(const TMetricDef& def, const string& mempool_name,
+ JvmMemoryMetricType type) : IntGauge(def, 0) {
mempool_name_ = mempool_name;
metric_type_ = type;
}
-Status JvmMetric::InitMetrics(MetricGroup* metrics) {
+
+Status JvmMemoryMetric::InitMetrics(MetricGroup* metrics) {
DCHECK(metrics != nullptr);
- TGetJvmMetricsRequest request;
+ TGetJvmMemoryMetricsRequest request;
request.get_all = true;
- TGetJvmMetricsResponse response;
- RETURN_IF_ERROR(JniUtil::GetJvmMetrics(request, &response));
+ TGetJvmMemoryMetricsResponse response;
+ RETURN_IF_ERROR(JniUtil::GetJvmMemoryMetrics(request, &response));
for (const TJvmMemoryPool& usage: response.memory_pools) {
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.max-usage-bytes", usage.name, MAX);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.current-usage-bytes", usage.name,
- CURRENT);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.committed-usage-bytes", usage.name,
- COMMITTED);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.init-usage-bytes", usage.name, INIT);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-max-usage-bytes", usage.name,
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.max-usage-bytes", usage.name, MAX);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.current-usage-bytes", usage.name, CURRENT);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.committed-usage-bytes", usage.name, COMMITTED);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.init-usage-bytes", usage.name, INIT);
+ JvmMemoryMetric::CreateAndRegister(metrics, "jvm.$0.peak-max-usage-bytes", usage.name,
PEAK_MAX);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-current-usage-bytes", usage.name,
- PEAK_CURRENT);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-committed-usage-bytes", usage.name,
- PEAK_COMMITTED);
- JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-init-usage-bytes", usage.name,
- PEAK_INIT);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.peak-current-usage-bytes", usage.name, PEAK_CURRENT);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.peak-committed-usage-bytes", usage.name, PEAK_COMMITTED);
+ JvmMemoryMetric::CreateAndRegister(
+ metrics, "jvm.$0.peak-init-usage-bytes", usage.name, PEAK_INIT);
}
return Status::OK();
}
-int64_t JvmMetric::GetValue() {
- TGetJvmMetricsRequest request;
+int64_t JvmMemoryMetric::GetValue() {
+ TGetJvmMemoryMetricsRequest request;
request.get_all = false;
request.__set_memory_pool(mempool_name_);
- TGetJvmMetricsResponse response;
- if (!JniUtil::GetJvmMetrics(request, &response).ok()) return 0;
+ TGetJvmMemoryMetricsResponse response;
+ if (!JniUtil::GetJvmMemoryMetrics(request, &response).ok()) return 0;
if (response.memory_pools.size() != 1) return 0;
TJvmMemoryPool& pool = response.memory_pools[0];
DCHECK(pool.name == mempool_name_);
@@ -226,7 +230,7 @@ int64_t JvmMetric::GetValue() {
case PEAK_COMMITTED:
return pool.peak_committed;
default:
- DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
+ DCHECK(false) << "Unknown JvmMemoryMetricType: " << metric_type_;
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index ed0e889..b2a2fdc 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -152,14 +152,14 @@ class SanitizerMallocMetric : public IntGauge {
}
};
-/// A JvmMetric corresponds to one value drawn from one 'memory pool' in the JVM. A memory
-/// pool is an area of memory assigned for one particular aspect of memory management. For
-/// example Hotspot has pools for the permanent generation, the old generation, survivor
-/// space, code cache and permanently tenured objects.
-class JvmMetric : public IntGauge {
+/// A JvmMemoryMetric corresponds to one value drawn from one 'memory pool' in the JVM. A
+/// memory pool is an area of memory assigned for one particular aspect of memory
+/// management. For example Hotspot has pools for the permanent generation, the old
+/// generation, survivor space, code cache and permanently tenured objects.
+class JvmMemoryMetric : public IntGauge {
public:
- /// Registers many Jvm memory metrics: one for every member of JvmMetricType for each
- /// pool (usually ~5 pools plus a synthetic 'total' pool).
+ /// Registers many Jvm memory metrics: one for every member of JvmMemoryMetricType for
+ /// each pool (usually ~5 pools plus a synthetic 'total' pool).
static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
/// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
@@ -168,7 +168,7 @@ class JvmMetric : public IntGauge {
private:
/// Each names one of the fields in TJvmMemoryPool.
- enum JvmMetricType {
+ enum JvmMemoryMetricType {
MAX,
INIT,
COMMITTED,
@@ -179,18 +179,19 @@ class JvmMetric : public IntGauge {
PEAK_CURRENT
};
- static JvmMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
- const std::string& pool_name, JvmMetric::JvmMetricType type);
+ static JvmMemoryMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
+ const std::string& pool_name, JvmMemoryMetric::JvmMemoryMetricType type);
- /// Private constructor to ensure only InitMetrics() can create JvmMetrics.
- JvmMetric(const TMetricDef& def, const std::string& mempool_name, JvmMetricType type);
+ /// Private constructor to ensure only InitMetrics() can create JvmMemoryMetrics.
+ JvmMemoryMetric(
+ const TMetricDef& def, const std::string& mempool_name, JvmMemoryMetricType type);
/// The name of the memory pool, defined by the Jvm.
std::string mempool_name_;
/// Each metric corresponds to one value; this tells us which value from the memory pool
/// that is.
- JvmMetricType metric_type_;
+ JvmMemoryMetricType metric_type_;
};
/// Metric that reports information about the buffer pool.
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index bfbfdfe..9aa50e9 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -247,8 +247,8 @@ TEST_F(MetricsTest, MemMetric) {
#endif
}
-TEST_F(MetricsTest, JvmMetrics) {
- MetricGroup metrics("JvmMetrics");
+TEST_F(MetricsTest, JvmMemoryMetrics) {
+ MetricGroup metrics("JvmMemoryMetrics");
ASSERT_OK(RegisterMemoryMetrics(&metrics, true, nullptr, nullptr));
IntGauge* jvm_total_used =
metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<IntGauge>(
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/webserver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver-test.cc b/be/src/util/webserver-test.cc
index 78934c6..7132b74 100644
--- a/be/src/util/webserver-test.cc
+++ b/be/src/util/webserver-test.cc
@@ -135,7 +135,7 @@ void JsonCallback(bool always_text, const Webserver::ArgumentMap& args,
document->AddMember(TO_ESCAPE_KEY.c_str(), TO_ESCAPE_VALUE.c_str(),
document->GetAllocator());
if (always_text) {
- document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
+ document->AddMember(Webserver::ENABLE_RAW_HTML_KEY, true, document->GetAllocator());
}
}
@@ -174,7 +174,7 @@ TEST(Webserver, JsonTest) {
Substitute("$0?raw", JSON_TEST_PATH), &raw_contents));
ASSERT_TRUE(raw_contents.str().find("text/plain") != string::npos);
- // Any callback that includes ENABLE_RAW_JSON_KEY should always return text.
+ // Any callback that includes ENABLE_RAW_HTML_KEY should always return text.
stringstream raw_cb_contents;
ASSERT_OK(HttpGet("localhost", FLAGS_webserver_port, RAW_TEXT_PATH,
&raw_cb_contents));
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index c8307b5..d5d9732 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -137,7 +137,9 @@ const char* GetDefaultDocumentRoot() {
namespace impala {
-const char* Webserver::ENABLE_RAW_JSON_KEY = "__raw__";
+const char* Webserver::ENABLE_RAW_HTML_KEY = "__raw__";
+
+const char* Webserver::ENABLE_PLAIN_JSON_KEY = "__json__";
// Supported HTTP response codes
enum ResponseCode {
@@ -435,9 +437,10 @@ void Webserver::RenderUrlWithTemplate(const ArgumentMap& arguments,
document.SetObject();
GetCommonJson(&document);
- bool raw_json = (arguments.find("json") != arguments.end());
url_handler.callback()(arguments, &document);
- if (raw_json) {
+ bool plain_json = (arguments.find("json") != arguments.end())
+ || document.HasMember(ENABLE_PLAIN_JSON_KEY);
+ if (plain_json) {
// Callbacks may optionally be rendered as a text-only, pretty-printed Json document
// (mostly for debugging or integration with third-party tools).
StringBuffer strbuf;
@@ -447,9 +450,9 @@ void Webserver::RenderUrlWithTemplate(const ArgumentMap& arguments,
*content_type = JSON;
} else {
if (arguments.find("raw") != arguments.end()) {
- document.AddMember(ENABLE_RAW_JSON_KEY, "true", document.GetAllocator());
+ document.AddMember(ENABLE_RAW_HTML_KEY, "true", document.GetAllocator());
}
- if (document.HasMember(ENABLE_RAW_JSON_KEY)) {
+ if (document.HasMember(ENABLE_RAW_HTML_KEY)) {
*content_type = PLAIN;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/be/src/util/webserver.h
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index c060d5e..1651d88 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -54,10 +54,16 @@ class Webserver {
typedef boost::function<void (const ArgumentMap& args, std::stringstream* output)>
RawUrlCallback;
- /// Any callback may add a member to their Json output with key ENABLE_RAW_JSON_KEY;
+ /// Any callback may add a member to their Json output with key ENABLE_RAW_HTML_KEY;
/// this causes the result of the template rendering process to be sent to the browser
/// as text, not HTML.
- static const char* ENABLE_RAW_JSON_KEY;
+ static const char* ENABLE_RAW_HTML_KEY;
+
+ /// Any callback may add a member to their Json output with key ENABLE_PLAIN_JSON_KEY;
+ /// this causes the result of the template rendering process to be sent to the browser
+ /// as pretty printed JSON plain text.
+ static const char* ENABLE_PLAIN_JSON_KEY;
+
/// Using this constructor, the webserver will bind to all available interfaces.
Webserver(const int port);
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 1245c94..abb2d77 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -758,7 +758,7 @@ struct TJvmMemoryPool {
}
// Request to get one or all sets of memory pool metrics.
-struct TGetJvmMetricsRequest {
+struct TGetJvmMemoryMetricsRequest {
// If set, return all pools
1: required bool get_all
@@ -766,8 +766,8 @@ struct TGetJvmMetricsRequest {
2: optional string memory_pool
}
-// Response from JniUtil::GetJvmMetrics()
-struct TGetJvmMetricsResponse {
+// Response from JniUtil::GetJvmMemoryMetrics()
+struct TGetJvmMemoryMetricsResponse {
// One entry for every pool tracked by the Jvm, plus a synthetic aggregate pool called
// 'total'
1: required list<TJvmMemoryPool> memory_pools
@@ -818,6 +818,11 @@ struct TGetJvmThreadsInfoResponse {
4: optional list<TJvmThreadInfo> threads
}
+struct TGetJMXJsonResponse {
+ // JMX of the JVM serialized to a json string.
+ 1: required string jmx_json
+}
+
struct TGetHadoopConfigRequest {
// The value of the <name> in the config <property>
1: required string name
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/fe/src/main/java/org/apache/impala/common/JniUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/JniUtil.java b/fe/src/main/java/org/apache/impala/common/JniUtil.java
index 6aec0d4..348b76d 100644
--- a/fe/src/main/java/org/apache/impala/common/JniUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/JniUtil.java
@@ -17,10 +17,10 @@
package org.apache.impala.common;
-import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
+import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
@@ -31,6 +31,8 @@ import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.Map;
+import org.apache.impala.thrift.TGetJMXJsonResponse;
+import org.apache.impala.util.JMXJsonUtil;
import org.apache.thrift.TBase;
import org.apache.thrift.TSerializer;
import org.apache.thrift.TDeserializer;
@@ -39,14 +41,17 @@ import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
-import org.apache.impala.thrift.TGetJvmMetricsRequest;
-import org.apache.impala.thrift.TGetJvmMetricsResponse;
+import org.apache.impala.thrift.TGetJvmMemoryMetricsRequest;
+import org.apache.impala.thrift.TGetJvmMemoryMetricsResponse;
import org.apache.impala.thrift.TGetJvmThreadsInfoRequest;
import org.apache.impala.thrift.TGetJvmThreadsInfoResponse;
import org.apache.impala.thrift.TJvmMemoryPool;
import org.apache.impala.thrift.TJvmThreadInfo;
+import org.apache.impala.util.JvmPauseMonitor;
+import org.apache.log4j.Logger;
/**
* Utility class with methods intended for JNI clients
*/
@@ -54,6 +59,15 @@ public class JniUtil {
private final static TBinaryProtocol.Factory protocolFactory_ =
new TBinaryProtocol.Factory();
+ private static final Logger LOG = Logger.getLogger(JniUtil.class);
+
+ /**
+ * Initializes the JvmPauseMonitor instance.
+ */
+ public static void initPauseMonitor() {
+ JvmPauseMonitor.INSTANCE.initPauseMonitor();
+ }
+
/**
* Returns a formatted string containing the simple exception name and the
* exception message without the full stack trace. Includes the
@@ -82,6 +96,18 @@ public class JniUtil {
}
/**
+ * Serializes input into a byte[] using a given protocol factory.
+ */
+ public static <T extends TBase<?, ?>, F extends TProtocolFactory>
+ byte[] serializeToThrift(T input, F protocolFactory) throws ImpalaException {
+ TSerializer serializer = new TSerializer(protocolFactory);
+ try {
+ return serializer.serialize(input);
+ } catch (TException e) {
+ throw new InternalException(e.getMessage());
+ }
+ }
+ /**
* Deserialize a serialized form of a Thrift data structure to its object form.
*/
public static <T extends TBase<?, ?>, F extends TProtocolFactory>
@@ -101,11 +127,11 @@ public class JniUtil {
* Impala metrics by the backend. A synthetic 'total' memory pool is included with
* aggregate statistics for all real pools.
*/
- public static byte[] getJvmMetrics(byte[] argument) throws ImpalaException {
- TGetJvmMetricsRequest request = new TGetJvmMetricsRequest();
+ public static byte[] getJvmMemoryMetrics(byte[] argument) throws ImpalaException {
+ TGetJvmMemoryMetricsRequest request = new TGetJvmMemoryMetricsRequest();
JniUtil.deserializeThrift(protocolFactory_, request, argument);
- TGetJvmMetricsResponse jvmMetrics = new TGetJvmMetricsResponse();
+ TGetJvmMemoryMetricsResponse jvmMetrics = new TGetJvmMemoryMetricsResponse();
jvmMetrics.setMemory_pools(new ArrayList<TJvmMemoryPool>());
TJvmMemoryPool totalUsage = new TJvmMemoryPool();
boolean is_total =
@@ -182,13 +208,7 @@ public class JniUtil {
nonHeap.setPeak_used(0);
jvmMetrics.getMemory_pools().add(nonHeap);
}
-
- TSerializer serializer = new TSerializer(protocolFactory_);
- try {
- return serializer.serialize(jvmMetrics);
- } catch (TException e) {
- throw new InternalException(e.getMessage());
- }
+ return serializeToThrift(jvmMetrics, protocolFactory_);
}
/**
@@ -215,13 +235,12 @@ public class JniUtil {
response.addToThreads(tThreadInfo);
}
}
+ return serializeToThrift(response, protocolFactory_);
+ }
- TSerializer serializer = new TSerializer(protocolFactory_);
- try {
- return serializer.serialize(response);
- } catch (TException e) {
- throw new InternalException(e.getMessage());
- }
+ public static byte[] getJMXJson() throws ImpalaException {
+ TGetJMXJsonResponse response = new TGetJMXJsonResponse(JMXJsonUtil.getJMXJson());
+ return serializeToThrift(response, protocolFactory_);
}
/**
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/fe/src/main/java/org/apache/impala/util/JMXJsonUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/JMXJsonUtil.java b/fe/src/main/java/org/apache/impala/util/JMXJsonUtil.java
new file mode 100644
index 0000000..faaba23
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/JMXJsonUtil.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.util;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.RuntimeErrorException;
+import javax.management.RuntimeMBeanException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Utility class that returns a JSON representation of the JMX beans.
+ * This is based on hadoop-common's implementation of JMXJsonServlet.
+ *
+ * Output format:
+ * {
+ * "beans" : [
+ * {
+ * "name":"bean-name"
+ * ...
+ * }
+ * ]
+ * }
+ * Each bean's attributes will be converted to a JSON object member.
+ * If the attribute is a boolean, a number, a string, or an array
+ * it will be converted to the JSON equivalent.
+ *
+ * If the value is a {@link CompositeData} then it will be converted
+ * to a JSON object with the keys as the name of the JSON member and
+ * the value is converted following these same rules.
+ * If the value is a {@link TabularData} then it will be converted
+ * to an array of the {@link CompositeData} elements that it contains.
+ * All other objects will be converted to a string and output as such.
+ * The bean's name and modelerType will be returned for all beans.
+ *
+ */
+public class JMXJsonUtil {
+ // MBean server instance
+ protected static transient MBeanServer mBeanServer =
+ ManagementFactory.getPlatformMBeanServer();
+
+ private static final Logger LOG = Logger.getLogger(JMXJsonUtil.class);
+
+ // Returns the JMX beans as a JSON string.
+ public static String getJMXJson() {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator jg = null;
+ try {
+ JsonFactory jsonFactory = new JsonFactory();
+ jg = jsonFactory.createJsonGenerator(writer);
+ jg.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ jg.writeStartObject();
+ if (mBeanServer == null) {
+ jg.writeStringField("result", "ERROR");
+ jg.writeStringField("message", "No MBeanServer could be found");
+ jg.close();
+ LOG.error("No MBeanServer could be found.");
+ return writer.toString();
+ }
+ listBeans(jg);
+ } finally {
+ if (jg != null) {
+ jg.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ } catch ( IOException e ) {
+ LOG.error("Caught an exception while processing JMX request", e);
+ }
+ return writer.toString();
+ }
+
+ // Utility method that lists all the mbeans and write them using the supplied
+ // JsonGenerator.
+ private static void listBeans(JsonGenerator jg) throws IOException {
+ Set<ObjectName> names;
+ names = mBeanServer.queryNames(null, null);
+ jg.writeArrayFieldStart("beans");
+ Iterator<ObjectName> it = names.iterator();
+ while (it.hasNext()) {
+ ObjectName oname = it.next();
+ MBeanInfo minfo;
+ String code = "";
+ Object attributeinfo = null;
+ try {
+ minfo = mBeanServer.getMBeanInfo(oname);
+ code = minfo.getClassName();
+ String prs = "";
+ try {
+ if ("org.apache.commons.modeler.BaseModelMBean".equals(code)) {
+ prs = "modelerType";
+ code = (String) mBeanServer.getAttribute(oname, prs);
+ }
+ } catch (AttributeNotFoundException e) {
+ // If the modelerType attribute was not found, the class name is used
+ // instead.
+ LOG.error("getting attribute " + prs + " of " + oname
+ + " threw an exception", e);
+ } catch (MBeanException e) {
+ // The code inside the attribute getter threw an exception so log it,
+ // and fall back on the class name
+ LOG.error("getting attribute " + prs + " of " + oname
+ + " threw an exception", e);
+ } catch (RuntimeException e) {
+ // For some reason even with an MBeanException available to them
+ // Runtime exceptionscan still find their way through, so treat them
+ // the same as MBeanException
+ LOG.error("getting attribute " + prs + " of " + oname
+ + " threw an exception", e);
+ } catch ( ReflectionException e ) {
+ // This happens when the code inside the JMX bean (setter?? from the
+ // java docs) threw an exception, so log it and fall back on the
+ // class name
+ LOG.error("getting attribute " + prs + " of " + oname
+ + " threw an exception", e);
+ }
+ } catch (InstanceNotFoundException e) {
+ //Ignored for some reason the bean was not found so don't output it
+ continue;
+ } catch ( IntrospectionException | ReflectionException e ) {
+ // This is an internal error, something odd happened with reflection so
+ // log it and don't output the bean.
+ LOG.error("Problem while trying to process JMX query with MBean " + oname, e);
+ continue;
+ }
+ jg.writeStartObject();
+ jg.writeStringField("name", oname.toString());
+ jg.writeStringField("modelerType", code);
+ MBeanAttributeInfo attrs[] = minfo.getAttributes();
+ for (int i = 0; i < attrs.length; i++) {
+ writeAttribute(jg, oname, attrs[i]);
+ }
+ jg.writeEndObject();
+ }
+ jg.writeEndArray();
+ }
+
+ // Utility method to write mBean attributes.
+ private static void writeAttribute(JsonGenerator jg, ObjectName oname,
+ MBeanAttributeInfo attr) throws IOException {
+ if (!attr.isReadable()) {
+ return;
+ }
+ String attName = attr.getName();
+ if ("modelerType".equals(attName)) {
+ return;
+ }
+ if (attName.indexOf("=") >= 0 || attName.indexOf(":") >= 0
+ || attName.indexOf(" ") >= 0) {
+ return;
+ }
+ Object value = null;
+ try {
+ value = mBeanServer.getAttribute(oname, attName);
+ } catch (RuntimeMBeanException e) {
+ // UnsupportedOperationExceptions happen in the normal course of business,
+ // so no need to log them as errors all the time.
+ if (e.getCause() instanceof UnsupportedOperationException) {
+ LOG.trace("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ } else {
+ LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ }
+ return;
+ } catch (RuntimeErrorException e) {
+ // RuntimeErrorException happens when an unexpected failure occurs in getAttribute
+ // for example https://issues.apache.org/jira/browse/DAEMON-120
+ LOG.debug("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ return;
+ } catch (AttributeNotFoundException e) {
+ //Ignored the attribute was not found, which should never happen because the bean
+ //just told us that it has this attribute, but if this happens just don't output
+ //the attribute.
+ return;
+ } catch (MBeanException e) {
+ //The code inside the attribute getter threw an exception so log it, and
+ // skip outputting the attribute
+ LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ return;
+ } catch (RuntimeException e) {
+ //For some reason even with an MBeanException available to them Runtime exceptions
+ //can still find their way through, so treat them the same as MBeanException
+ LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ return;
+ } catch (ReflectionException e) {
+ //This happens when the code inside the JMX bean (setter?? from the java docs)
+ //threw an exception, so log it and skip outputting the attribute
+ LOG.error("getting attribute "+attName+" of "+oname+" threw an exception", e);
+ return;
+ } catch (InstanceNotFoundException e) {
+ //Ignored the mbean itself was not found, which should never happen because we
+ //just accessed it (perhaps something unregistered in-between) but if this
+ //happens just don't output the attribute.
+ return;
+ }
+ writeAttribute(jg, attName, value);
+ }
+
+ private static void writeAttribute(JsonGenerator jg, String attName, Object value)
+ throws IOException {
+ jg.writeFieldName(attName);
+ writeObject(jg, value);
+ }
+
+ private static void writeObject(JsonGenerator jg, Object value) throws IOException {
+ if(value == null) {
+ jg.writeNull();
+ } else {
+ Class<?> c = value.getClass();
+ if (c.isArray()) {
+ jg.writeStartArray();
+ int len = Array.getLength(value);
+ for (int j = 0; j < len; j++) {
+ Object item = Array.get(value, j);
+ writeObject(jg, item);
+ }
+ jg.writeEndArray();
+ } else if(value instanceof Number) {
+ Number n = (Number)value;
+ jg.writeNumber(n.toString());
+ } else if(value instanceof Boolean) {
+ Boolean b = (Boolean)value;
+ jg.writeBoolean(b);
+ } else if(value instanceof CompositeData) {
+ CompositeData cds = (CompositeData)value;
+ CompositeType comp = cds.getCompositeType();
+ Set<String> keys = comp.keySet();
+ jg.writeStartObject();
+ for(String key: keys) {
+ writeAttribute(jg, key, cds.get(key));
+ }
+ jg.writeEndObject();
+ } else if(value instanceof TabularData) {
+ TabularData tds = (TabularData)value;
+ jg.writeStartArray();
+ for(Object entry : tds.values()) {
+ writeObject(jg, entry);
+ }
+ jg.writeEndArray();
+ } else {
+ jg.writeString(value.toString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/fe/src/main/java/org/apache/impala/util/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/JvmPauseMonitor.java b/fe/src/main/java/org/apache/impala/util/JvmPauseMonitor.java
new file mode 100644
index 0000000..e4f28bd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/JvmPauseMonitor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.impala.util;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.base.Stopwatch;
+
+/**
+ * Class which sets up a simple thread which runs in a loop sleeping
+ * for a short interval of time. If the sleep takes significantly longer
+ * than its target time, it implies that the JVM or host machine has
+ * paused processing, which may cause other problems. If such a pause is
+ * detected, the thread logs a message.
+ */
+public class JvmPauseMonitor {
+ private static final Logger LOG = Logger.getLogger(JvmPauseMonitor.class);
+
+ // The target sleep time.
+ private static final long SLEEP_INTERVAL_MS = 500;
+
+ // log WARN if we detect a pause longer than this threshold.
+ private long warnThresholdMs_;
+ private static final long WARN_THRESHOLD_MS = 10000;
+
+ // log INFO if we detect a pause longer than this threshold.
+ private long infoThresholdMs_;
+ private static final long INFO_THRESHOLD_MS = 1000;
+
+ // Daemon thread running the pause monitor loop.
+ private Thread monitorThread_;
+ private volatile boolean shouldRun = true;
+
+ // Singleton instance of this pause monitor.
+ public static JvmPauseMonitor INSTANCE = new JvmPauseMonitor();
+
+ // Initializes the pause monitor. No-op if called multiple times.
+ public static void initPauseMonitor() {
+ if (INSTANCE.isStarted()) return;
+ INSTANCE.init();
+ }
+
+ private JvmPauseMonitor() {
+ this(INFO_THRESHOLD_MS, WARN_THRESHOLD_MS);
+ }
+
+ private JvmPauseMonitor(long infoThresholdMs, long warnThresholdMs) {
+ this.infoThresholdMs_ = infoThresholdMs;
+ this.warnThresholdMs_ = warnThresholdMs;
+ }
+
+ protected void init() {
+ monitorThread_ = new Thread(new Monitor(), "JVM pause monitor");
+ monitorThread_.setDaemon(true);
+ monitorThread_.start();
+ }
+
+ public boolean isStarted() {
+ return monitorThread_ != null;
+ }
+
+ /**
+ * Helper method that formats the message to be logged, along with
+ * the GC metrics.
+ */
+ private String formatMessage(long extraSleepTime,
+ Map<String, GcTimes> gcTimesAfterSleep,
+ Map<String, GcTimes> gcTimesBeforeSleep) {
+
+ Set<String> gcBeanNames = Sets.intersection(
+ gcTimesAfterSleep.keySet(),
+ gcTimesBeforeSleep.keySet());
+ List<String> gcDiffs = Lists.newArrayList();
+ for (String name : gcBeanNames) {
+ GcTimes diff = gcTimesAfterSleep.get(name).subtract(
+ gcTimesBeforeSleep.get(name));
+ if (diff.gcCount != 0) {
+ gcDiffs.add("GC pool '" + name + "' had collection(s): " +
+ diff.toString());
+ }
+ }
+
+ String ret = "Detected pause in JVM or host machine (eg GC): " +
+ "pause of approximately " + extraSleepTime + "ms\n";
+ if (gcDiffs.isEmpty()) {
+ ret += "No GCs detected";
+ } else {
+ ret += Joiner.on("\n").join(gcDiffs);
+ }
+ return ret;
+ }
+
+ private Map<String, GcTimes> getGcTimes() {
+ Map<String, GcTimes> map = Maps.newHashMap();
+ List<GarbageCollectorMXBean> gcBeans =
+ ManagementFactory.getGarbageCollectorMXBeans();
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ map.put(gcBean.getName(), new GcTimes(gcBean));
+ }
+ return map;
+ }
+
+ private static class GcTimes {
+ private GcTimes(GarbageCollectorMXBean gcBean) {
+ gcCount = gcBean.getCollectionCount();
+ gcTimeMillis = gcBean.getCollectionTime();
+ }
+
+ private GcTimes(long count, long time) {
+ this.gcCount = count;
+ this.gcTimeMillis = time;
+ }
+
+ private GcTimes subtract(GcTimes other) {
+ return new GcTimes(this.gcCount - other.gcCount,
+ this.gcTimeMillis - other.gcTimeMillis);
+ }
+
+ @Override
+ public String toString() {
+ return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
+ }
+
+ private long gcCount;
+ private long gcTimeMillis;
+ }
+
+ /**
+ * Runnable instance of the pause monitor loop. Launched from serviceStart().
+ */
+ private class Monitor implements Runnable {
+ @Override
+ public void run() {
+ Stopwatch sw = new Stopwatch();
+ Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
+ LOG.info("Starting JVM pause monitor");
+ while (shouldRun) {
+ sw.reset().start();
+ try {
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (InterruptedException ie) {
+ return;
+ }
+ sw.stop();
+ long extraSleepTime = sw.elapsedTime(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
+ Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
+
+ if (extraSleepTime > warnThresholdMs_) {
+ LOG.warn(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ } else if (extraSleepTime > infoThresholdMs_) {
+ LOG.info(formatMessage(
+ extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
+ }
+ gcTimesBeforeSleep = gcTimesAfterSleep;
+ }
+ }
+ }
+
+ /**
+ * Simple 'main' to facilitate manual testing of the pause monitor.
+ *
+ * This main function just leaks memory into a list. Running this class
+ * with a 1GB heap will very quickly go into "GC hell" and result in
+ * log messages about the GC pauses.
+ */
+ @SuppressWarnings("resource")
+ public static void main(String []args) throws Exception {
+ JvmPauseMonitor monitor = new JvmPauseMonitor();
+ monitor.init();
+ List<String> list = Lists.newArrayList();
+ int i = 0;
+ while (true) {
+ list.add(String.valueOf(i++));
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/fe/src/test/java/org/apache/impala/util/JMXJsonUtilTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/JMXJsonUtilTest.java b/fe/src/test/java/org/apache/impala/util/JMXJsonUtilTest.java
new file mode 100644
index 0000000..95e9c3d
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/JMXJsonUtilTest.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import org.apache.impala.common.ImpalaException;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Unit tests for JMXJsonUtil functionality.
+ */
+public class JMXJsonUtilTest {
+
+ // Validates the JSON string returned by JMXJsonUtil.getJMXJson()
+ @Test
+ public void testJMXMetrics() throws ImpalaException {
+ String jmxJson = JMXJsonUtil.getJMXJson();
+ JsonNode rootNode = null;
+ // Validate the JSON.
+ try {
+ rootNode = new ObjectMapper().readTree(jmxJson);
+ } catch (IOException e) {
+ fail("Invalid JSON returned by getMxJson(): " + jmxJson);
+ }
+ Preconditions.checkNotNull(rootNode);
+ assertTrue("Invalid JSON: " + jmxJson, rootNode.hasNonNull("beans"));
+ List<String> values = rootNode.get("beans").findValuesAsText("name");
+ assertTrue("Invalid JSON: " + jmxJson,
+ values.contains("java.lang:type=MemoryPool,name=Metaspace"));
+ assertTrue("Invalid JSON: " + jmxJson, values.contains("java.lang:type=Runtime"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/fe/src/test/java/org/apache/impala/util/JniUtilTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/JniUtilTest.java b/fe/src/test/java/org/apache/impala/util/JniUtilTest.java
new file mode 100644
index 0000000..6166179
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/JniUtilTest.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.junit.Assert.*;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.impala.thrift.TGetJMXJsonResponse;
+import org.junit.Test;
+
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.JniUtil;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.impala.thrift.TCacheJarParams;
+
+import java.io.IOException;
+
+/**
+ * Unit tests for JniUtil functions.
+ */
+public class JniUtilTest {
+
+ private static TBinaryProtocol.Factory protocolFactory_ = new TBinaryProtocol.Factory();
+
+ // Unit test for JniUtil.serializetoThrift().
+ @Test
+ public void testSerializeToThrift() throws ImpalaException {
+ // Serialize and deserialize an simple thrift object.
+ TCacheJarParams testObject = new TCacheJarParams("test string");
+ byte[] testObjBytes = JniUtil.serializeToThrift(testObject, protocolFactory_);
+
+ TCacheJarParams deserializedTestObj = new TCacheJarParams();
+ JniUtil.deserializeThrift(protocolFactory_, deserializedTestObj, testObjBytes);
+ assertEquals(deserializedTestObj.hdfs_location, "test string");
+ }
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/tests/custom_cluster/test_pause_monitor.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_pause_monitor.py b/tests/custom_cluster/test_pause_monitor.py
new file mode 100644
index 0000000..f4e616b
--- /dev/null
+++ b/tests/custom_cluster/test_pause_monitor.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import signal
+import time
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestPauseMonitor(CustomClusterTestSuite):
+ """Class for pause monitor tests."""
+
+ def test_jvm_pause_monitor_logs_entries(self):
+ """This test injects a non-GC pause and confirms that that the JVM pause
+ monitor detects and logs it."""
+ impalad = self.cluster.get_first_impalad()
+ # Send a SIGSTOP for the process and block it for 5s.
+ impalad.kill(signal.SIGSTOP)
+ time.sleep(5)
+ impalad.kill(signal.SIGCONT)
+ # Wait for a few seconds for the logs to get flushed.
+ time.sleep(5)
+ # Check that the pause is detected.
+ self.assert_impalad_log_contains('INFO', "Detected pause in JVM or host machine")
http://git-wip-us.apache.org/repos/asf/impala/blob/4976ff3c/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 1ac1576..15d4f8f 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -36,6 +36,8 @@ class TestWebPage(ImpalaTestSuite):
RPCZ_URL = "http://localhost:{0}/rpcz"
THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
METRICS_URL = "http://localhost:{0}/metrics"
+ JMX_URL = "http://localhost:{0}/jmx"
+
# log4j changes do not apply to the statestore since it doesn't
# have an embedded JVM. So we make two sets of ports to test the
# log level endpoints, one without the statestore port and the
@@ -63,6 +65,20 @@ class TestWebPage(ImpalaTestSuite):
result = impalad.service.read_debug_webpage("query_profile_encoded?query_id=123")
assert result.startswith("Could not obtain runtime profile: Query id")
+ def test_jmx_endpoint(self):
+ """Tests that the /jmx endpoint on the Catalog and Impalads returns a valid json."""
+ for port in self.TEST_PORTS_WITHOUT_SS:
+ input_url = self.JMX_URL.format(port)
+ response = requests.get(input_url)
+ assert response.status_code == requests.codes.ok
+ assert "application/json" == response.headers['Content-Type']
+ jmx_json = ""
+ try:
+ jmx_json = json.loads(response.text)
+ assert "beans" in jmx_json.keys(), "Ill formatted JSON returned: %s" % jmx_json
+ except ValueError:
+ assert False, "Invalid JSON returned from /jmx endpoint: %s" % jmx_json
+
def get_and_check_status(self, url, string_to_search = "", ports_to_test = None):
"""Helper method that polls a given url and asserts the return code is ok and
the response contains the input string."""
@@ -73,7 +89,6 @@ class TestWebPage(ImpalaTestSuite):
response = requests.get(input_url)
assert response.status_code == requests.codes.ok\
and string_to_search in response.text, "Offending url: " + input_url
- return response.text
def get_debug_page(self, page_url):
"""Returns the content of the debug page 'page_url' as json."""
@@ -83,8 +98,8 @@ class TestWebPage(ImpalaTestSuite):
def get_and_check_status_jvm(self, url, string_to_search = ""):
"""Calls get_and_check_status() for impalad and catalogd only"""
- return self.get_and_check_status(url, string_to_search,
- ports_to_test=self.TEST_PORTS_WITHOUT_SS)
+ self.get_and_check_status(url, string_to_search,
+ ports_to_test=self.TEST_PORTS_WITHOUT_SS)
def test_content_type(self):
"""Checks that an appropriate content-type is set for various types of pages."""