You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/21 03:19:09 UTC

[doris] branch master updated: [feature](jni) add jni metrics and attach to BE profile automatically (#21004)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef17289925 [feature](jni) add jni metrics and attach to BE profile automatically (#21004)
ef17289925 is described below

commit ef1728992515d3eafbbe795cc6010d441b1f7899
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Wed Jun 21 11:19:02 2023 +0800

    [feature](jni) add jni metrics and attach to BE profile automatically (#21004)
    
    Add JNI metrics, for example:
    ```
    -  HudiJniScanner:  0ns
      -  FillBlockTime:  31.29ms
      -  GetRecordReaderTime:  1m5s
      -  JavaScanTime:  35s991ms
      -  OpenScannerTime:  1m6s
    ```
    Add three common performance metrics for JNI scanner:
    1. `OpenScannerTime`: Time to init and open JNI scanner
    2. `JavaScanTime`: Time to scan data and insert into vector table in java side
    3. `FillBlockTime`: Time to convert java vector table to c++ block
    
    And support user defined metrics in java side, for example: `OpenScannerTime` is a long time for the open process, we want to determine which sub-process takes too much time, so we add `GetRecordReaderTime` in java side.
    The user defined metrics in java side can be attached to BE profile automatically.
---
 be/src/util/jni-util.cpp                           | 81 ++++++++++++++++++++++
 be/src/util/jni-util.h                             |  3 +
 be/src/vec/exec/jni_connector.cpp                  | 74 ++++++++++++++------
 be/src/vec/exec/jni_connector.h                    | 19 ++++-
 .../java/org/apache/doris/hudi/HudiJniScanner.java |  9 +++
 .../org/apache/doris/common/jni/JniScanner.java    | 11 +++
 6 files changed, 174 insertions(+), 23 deletions(-)

diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 7b1843490a..0191a80bf8 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -243,6 +243,87 @@ Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& pr
     return Status::InternalError("{}{}", prefix, msg_str_guard.get());
 }
 
+jobject JniUtil::convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map) {
+    jclass hashmap_class = env->FindClass("java/util/HashMap");
+    jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
+    jobject hashmap_object = env->NewObject(hashmap_class, hashmap_constructor, map.size());
+    jmethodID hashmap_put = env->GetMethodID(
+            hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
+    for (const auto& it : map) {
+        jstring key = env->NewStringUTF(it.first.c_str());
+        jstring value = env->NewStringUTF(it.second.c_str());
+        env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
+        env->DeleteLocalRef(key);
+        env->DeleteLocalRef(value);
+    }
+    env->DeleteLocalRef(hashmap_class);
+    return hashmap_object;
+}
+
+std::map<std::string, std::string> JniUtil::convert_to_cpp_map(JNIEnv* env, jobject map) {
+    std::map<std::string, std::string> resultMap;
+
+    // Get the class and method ID of the java.util.Map interface
+    jclass mapClass = env->FindClass("java/util/Map");
+    jmethodID entrySetMethod = env->GetMethodID(mapClass, "entrySet", "()Ljava/util/Set;");
+
+    // Get the class and method ID of the java.util.Set interface
+    jclass setClass = env->FindClass("java/util/Set");
+    jmethodID iteratorSetMethod = env->GetMethodID(setClass, "iterator", "()Ljava/util/Iterator;");
+
+    // Get the class and method ID of the java.util.Iterator interface
+    jclass iteratorClass = env->FindClass("java/util/Iterator");
+    jmethodID hasNextMethod = env->GetMethodID(iteratorClass, "hasNext", "()Z");
+    jmethodID nextMethod = env->GetMethodID(iteratorClass, "next", "()Ljava/lang/Object;");
+
+    // Get the class and method ID of the java.util.Map.Entry interface
+    jclass entryClass = env->FindClass("java/util/Map$Entry");
+    jmethodID getKeyMethod = env->GetMethodID(entryClass, "getKey", "()Ljava/lang/Object;");
+    jmethodID getValueMethod = env->GetMethodID(entryClass, "getValue", "()Ljava/lang/Object;");
+
+    // Call the entrySet method to get the set of key-value pairs
+    jobject entrySet = env->CallObjectMethod(map, entrySetMethod);
+
+    // Call the iterator method on the set to iterate over the key-value pairs
+    jobject iteratorSet = env->CallObjectMethod(entrySet, iteratorSetMethod);
+
+    // Iterate over the key-value pairs
+    while (env->CallBooleanMethod(iteratorSet, hasNextMethod)) {
+        // Get the current entry
+        jobject entry = env->CallObjectMethod(iteratorSet, nextMethod);
+
+        // Get the key and value from the entry
+        jobject javaKey = env->CallObjectMethod(entry, getKeyMethod);
+        jobject javaValue = env->CallObjectMethod(entry, getValueMethod);
+
+        // Convert the key and value to C++ strings
+        const char* key = env->GetStringUTFChars(static_cast<jstring>(javaKey), nullptr);
+        const char* value = env->GetStringUTFChars(static_cast<jstring>(javaValue), nullptr);
+
+        // Store the key-value pair in the map
+        resultMap[key] = value;
+
+        // Release the string references
+        env->ReleaseStringUTFChars(static_cast<jstring>(javaKey), key);
+        env->ReleaseStringUTFChars(static_cast<jstring>(javaValue), value);
+
+        // Delete local references
+        env->DeleteLocalRef(entry);
+        env->DeleteLocalRef(javaKey);
+        env->DeleteLocalRef(javaValue);
+    }
+
+    // Delete local references
+    env->DeleteLocalRef(iteratorSet);
+    env->DeleteLocalRef(entrySet);
+    env->DeleteLocalRef(mapClass);
+    env->DeleteLocalRef(setClass);
+    env->DeleteLocalRef(iteratorClass);
+    env->DeleteLocalRef(entryClass);
+
+    return resultMap;
+}
+
 Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) {
     *class_ref = NULL;
     jclass local_cl = env->FindClass(class_str);
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index f43d5577d1..bb08612be0 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -75,6 +75,9 @@ public:
         return INITIAL_RESERVED_BUFFER_SIZE << n;
     }
 
+    static jobject convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map);
+    static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, jobject map);
+
 private:
     static Status GetJNIEnvSlowPath(JNIEnv** env);
 
diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp
index 178bcd8bb7..d3fafe5974 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -63,6 +63,12 @@ JniConnector::~JniConnector() {
 }
 
 Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+    ADD_TIMER(_profile, _connector_name.c_str());
+    _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str());
+    _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str());
+    _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str());
     // cannot put the env into fields, because frames in an env object is limited
     // to avoid limited frames in a thread, we should get local env in a method instead of in whole object.
     JNIEnv* env = nullptr;
@@ -70,6 +76,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
     if (env == nullptr) {
         return Status::InternalError("Failed to get/create JVM");
     }
+    SCOPED_TIMER(_open_scanner_time);
     RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size()));
     // Call org.apache.doris.common.jni.JniScanner#open
     env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
@@ -94,7 +101,11 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
     // return the address of meta information
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
-    long meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
+    long meta_address = 0;
+    {
+        SCOPED_TIMER(_java_scan_time);
+        meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
+    }
     RETURN_ERROR_IF_EXC(env);
     if (meta_address == 0) {
         // Address == 0 when there's no data in scanner
@@ -118,10 +129,43 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
     return Status::OK();
 }
 
+std::map<std::string, std::string> JniConnector::get_statistics(JNIEnv* env) {
+    jobject metrics = env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_statistics);
+    std::map<std::string, std::string> result = JniUtil::convert_to_cpp_map(env, metrics);
+    env->DeleteLocalRef(metrics);
+    return result;
+}
+
 Status JniConnector::close() {
     if (!_closed) {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+        // update scanner metrics
+        for (const auto& metric : get_statistics(env)) {
+            std::vector<std::string> type_and_name = split(metric.first, ":");
+            if (type_and_name.size() != 2) {
+                LOG(WARNING) << "Name of JNI Scanner metric should be pattern like "
+                             << "'metricType:metricName'";
+                continue;
+            }
+            long metric_value = std::stol(metric.second);
+            RuntimeProfile::Counter* scanner_counter;
+            if (type_and_name[0] == "timer") {
+                scanner_counter =
+                        ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str());
+            } else if (type_and_name[0] == "counter") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT,
+                                                    _connector_name.c_str());
+            } else if (type_and_name[0] == "bytes") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES,
+                                                    _connector_name.c_str());
+            } else {
+                LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes";
+                continue;
+            }
+            COUNTER_UPDATE(scanner_counter, metric_value);
+        }
+
         // _fill_block may be failed and returned, we should release table in close.
         // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
         env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -145,41 +189,27 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {
     RETURN_ERROR_IF_EXC(env);
 
     // prepare constructor parameters
-    jclass hashmap_class = env->FindClass("java/util/HashMap");
-    jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
-    jobject hashmap_object =
-            env->NewObject(hashmap_class, hashmap_constructor, _scanner_params.size());
-    jmethodID hashmap_put = env->GetMethodID(
-            hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
-    RETURN_ERROR_IF_EXC(env);
-    for (const auto& it : _scanner_params) {
-        jstring key = env->NewStringUTF(it.first.c_str());
-        jstring value = env->NewStringUTF(it.second.c_str());
-        env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
-        env->DeleteLocalRef(key);
-        env->DeleteLocalRef(value);
-    }
-    env->DeleteLocalRef(hashmap_class);
-    _jni_scanner_obj =
+    jobject hashmap_object = JniUtil::convert_to_java_map(env, _scanner_params);
+    jobject jni_scanner_obj =
             env->NewObject(_jni_scanner_cls, scanner_constructor, batch_size, hashmap_object);
     env->DeleteLocalRef(hashmap_object);
     RETURN_ERROR_IF_EXC(env);
 
     _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V");
-    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, "getNextBatchMeta", "()J");
-    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V");
-    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseColumn", "(I)V");
-    RETURN_ERROR_IF_EXC(env);
     _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V");
+    _jni_scanner_get_statistics =
+            env->GetMethodID(_jni_scanner_cls, "getStatistics", "()Ljava/util/Map;");
+    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj, &_jni_scanner_obj));
+    env->DeleteLocalRef(jni_scanner_obj);
     RETURN_ERROR_IF_EXC(env);
-    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _jni_scanner_obj, &_jni_scanner_obj));
     return Status::OK();
 }
 
 Status JniConnector::_fill_block(Block* block, size_t num_rows) {
+    SCOPED_TIMER(_fill_block_time);
     for (int i = 0; i < _column_names.size(); ++i) {
         auto& column_with_type_and_name = block->get_by_name(_column_names[i]);
         auto& column_ptr = column_with_type_and_name.column;
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 77be2329e0..36be5379a9 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -163,7 +163,10 @@ public:
                  std::vector<std::string> column_names)
             : _connector_class(std::move(connector_class)),
               _scanner_params(std::move(scanner_params)),
-              _column_names(std::move(column_names)) {}
+              _column_names(std::move(column_names)) {
+        // Use java class name as connector name
+        _connector_name = split(_connector_class, "/").back();
+    }
 
     /// Should release jni resources if other functions are failed.
     ~JniConnector();
@@ -197,6 +200,11 @@ public:
      */
     Status get_nex_block(Block* block, size_t* read_rows, bool* eof);
 
+    /**
+     * Get performance metrics from java scanner
+     */
+    std::map<std::string, std::string> get_statistics(JNIEnv* env);
+
     /**
      * Close scanner and release jni resources.
      */
@@ -210,10 +218,18 @@ public:
     static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& meta);
 
 private:
+    std::string _connector_name;
     std::string _connector_class;
     std::map<std::string, std::string> _scanner_params;
     std::vector<std::string> _column_names;
 
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    RuntimeProfile::Counter* _open_scanner_time;
+    RuntimeProfile::Counter* _java_scan_time;
+    RuntimeProfile::Counter* _fill_block_time;
+    std::map<std::string, RuntimeProfile::Counter*> _scanner_profile;
+
     size_t _has_read = 0;
 
     bool _closed = false;
@@ -224,6 +240,7 @@ private:
     jmethodID _jni_scanner_close;
     jmethodID _jni_scanner_release_column;
     jmethodID _jni_scanner_release_table;
+    jmethodID _jni_scanner_get_statistics;
 
     long* _meta_ptr;
     int _meta_index;
diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index ce86b27ad6..556c6b2e7b 100644
--- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -41,6 +41,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -63,6 +64,8 @@ public class HudiJniScanner extends JniScanner {
     private Deserializer deserializer;
     private final ClassLoader classLoader;
 
+    private long getRecordReaderTimeNs = 0;
+
     public HudiJniScanner(int fetchSize, Map<String, String> params) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue())
@@ -182,6 +185,7 @@ public class HudiJniScanner extends JniScanner {
             }
         }, 100, 1000, TimeUnit.MILLISECONDS);
 
+        long startTime = System.nanoTime();
         if (ugi != null) {
             reader = ugi.doAs((PrivilegedExceptionAction<RecordReader<NullWritable, ArrayWritable>>) () -> {
                 RecordReader<NullWritable, ArrayWritable> ugiReader
@@ -193,6 +197,7 @@ public class HudiJniScanner extends JniScanner {
             reader = (RecordReader<NullWritable, ArrayWritable>) inputFormatClass
                     .getRecordReader(hudiSplit, jobConf, Reporter.NULL);
         }
+        getRecordReaderTimeNs += System.nanoTime() - startTime;
         isKilled.set(true);
         executorService.shutdownNow();
 
@@ -207,4 +212,8 @@ public class HudiJniScanner extends JniScanner {
         }
     }
 
+    @Override
+    public Map<String, String> getStatistics() {
+        return Collections.singletonMap("timer:GetRecordReaderTime", String.valueOf(getRecordReaderTimeNs));
+    }
 }
diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index 7e7391520e..89c960bddc 100644
--- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -24,6 +24,8 @@ import org.apache.doris.common.jni.vec.ScanPredicate;
 import org.apache.doris.common.jni.vec.VectorTable;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 
 public abstract class JniScanner {
     protected VectorTable vectorTable;
@@ -79,6 +81,15 @@ public abstract class JniScanner {
         return getMetaAddress(numRows);
     }
 
+    /**
+     * Get performance metrics. The key should be pattern like "metricType:metricName".
+     * Support three metric types: timer, counter and bytes.
+     * The c++ side will attach metricName into profile automatically.
+     */
+    public Map<String, String> getStatistics() {
+        return Collections.emptyMap();
+    }
+
     private long getMetaAddress(int numRows) {
         vectorTable.setNumRows(numRows);
         return vectorTable.getMetaAddress();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org