You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/26 01:26:48 UTC

[doris] branch branch-1.2-lts updated: [cherry-pick](udaf) pick some java-udaf from master (#21012)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new eb3d9236dd [cherry-pick](udaf) pick some java-udaf from master (#21012)
eb3d9236dd is described below

commit eb3d9236dd6c0697907200a8bc652ee101ed7b71
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Mon Jun 26 09:26:39 2023 +0800

    [cherry-pick](udaf) pick some java-udaf from master (#21012)
---
 .../aggregate_function_java_udaf.h                 | 27 ++++++++++--
 .../java/org/apache/doris/udf/BaseExecutor.java    |  1 +
 .../java/org/apache/doris/udf/UdafExecutor.java    | 48 +++++++++++++++++++---
 3 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 2ea485bd7a..35e555ab0b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -44,6 +44,7 @@ const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(ZJJ)V";
 const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(J)[B";
 const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(J[B)V";
 const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(JJ)Z";
+const char* UDAF_EXECUTOR_RESET_SIGNATURE = "(J)V";
 // Calling Java method about those signature means: "(argument-types)return-type"
 // https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
 
@@ -183,6 +184,13 @@ public:
         return JniUtil::GetJniExceptionMsg(env);
     }
 
+    Status reset(int64_t place) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf reset function");
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_reset_id, place);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
     void read(BufferReadable& buf) { read_binary(serialize_data, buf); }
 
     Status destroy() {
@@ -258,6 +266,7 @@ private:
 
         RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
         RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("reset", UDAF_EXECUTOR_RESET_SIGNATURE, executor_reset_id));
         RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
         RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
         RETURN_IF_ERROR(
@@ -279,6 +288,7 @@ private:
     jmethodID executor_add_id;
     jmethodID executor_merge_id;
     jmethodID executor_serialize_id;
+    jmethodID executor_reset_id;
     jmethodID executor_result_id;
     jmethodID executor_close_id;
     jmethodID executor_destroy_id;
@@ -367,10 +377,19 @@ public:
         this->data(_exec_place).add(places_address, true, columns, 0, batch_size, argument_types);
     }
 
-    // TODO: reset function should be implement also in struct data
-    void reset(AggregateDataPtr /*place*/) const override {
-        LOG(WARNING) << " shouldn't going reset function, there maybe some error about function "
-                     << _fn.name.function_name;
+    void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
+                                int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
+                                Arena* /*arena*/) const override {
+        frame_start = std::max<int64_t>(frame_start, partition_start);
+        frame_end = std::min<int64_t>(frame_end, partition_end);
+        int64_t places_address[1];
+        places_address[0] = reinterpret_cast<int64_t>(place);
+        this->data(_exec_place)
+                .add(places_address, true, columns, frame_start, frame_end, argument_types);
+    }
+
+    void reset(AggregateDataPtr place) const override {
+        this->data(_exec_place).reset(reinterpret_cast<int64_t>(place));
     }
 
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 9e481b521a..16d85211b1 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -42,6 +42,7 @@ public abstract class BaseExecutor {
     public static final String UDAF_CREATE_FUNCTION = "create";
     public static final String UDAF_DESTROY_FUNCTION = "destroy";
     public static final String UDAF_ADD_FUNCTION = "add";
+    public static final String UDAF_RESET_FUNCTION = "reset";
     public static final String UDAF_SERIALIZE_FUNCTION = "serialize";
     public static final String UDAF_DESERIALIZE_FUNCTION = "deserialize";
     public static final String UDAF_MERGE_FUNCTION = "merge";
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
index 4f88fa967e..fba0dfd361 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
@@ -71,10 +71,21 @@ public class UdafExecutor extends BaseExecutor {
         try {
             long idx = rowStart;
             do {
-                Long curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr) + 8L * idx);
+                Long curPlace = null;
+                if (isSinglePlace) {
+                    curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr));
+                } else {
+                    curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr) + 8L * idx);
+                }
                 Object[] inputArgs = new Object[argTypes.length + 1];
-                stateObjMap.putIfAbsent(curPlace, createAggState());
-                inputArgs[0] = stateObjMap.get(curPlace);
+                Object state = stateObjMap.get(curPlace);
+                if (state != null) {
+                    inputArgs[0] = state;
+                } else {
+                    Object newState = createAggState();
+                    stateObjMap.put(curPlace, newState);
+                    inputArgs[0] = newState;
+                }
                 do {
                     Object[] inputObjects = allocateInputObjects(idx, 1);
                     for (int i = 0; i < argTypes.length; ++i) {
@@ -130,6 +141,23 @@ public class UdafExecutor extends BaseExecutor {
         }
     }
 
+    /*
+     * invoke reset function and reset the state to init.
+     */
+    public void reset(long place) throws UdfRuntimeException {
+        try {
+            Object[] args = new Object[1];
+            args[0] = stateObjMap.get((Long) place);
+            if (args[0] == null) {
+                return;
+            }
+            allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args);
+        } catch (Exception e) {
+            LOG.warn("invoke reset function meet some error: " + e.getCause().toString());
+            throw new UdfRuntimeException("UDAF failed to reset: ", e);
+        }
+    }
+
     /**
      * invoke merge function and it's have done deserialze.
      * here call deserialize first, and call merge.
@@ -143,8 +171,14 @@ public class UdafExecutor extends BaseExecutor {
             allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf, args);
             args[1] = args[0];
             Long curPlace = place;
-            stateObjMap.putIfAbsent(curPlace, createAggState());
-            args[0] = stateObjMap.get(curPlace);
+            Object state = stateObjMap.get(curPlace);
+            if (state != null) {
+                args[0] = state;
+            } else {
+                Object newState = createAggState();
+                stateObjMap.put(curPlace, newState);
+                args[0] = newState;
+            }
             allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args);
         } catch (Exception e) {
             throw new UdfRuntimeException("UDAF failed to merge: ", e);
@@ -156,6 +190,9 @@ public class UdafExecutor extends BaseExecutor {
      */
     public boolean getValue(long row, long place) throws UdfRuntimeException {
         try {
+            if (stateObjMap.get(place) == null) {
+                stateObjMap.put(place, createAggState());
+            }
             return storeUdfResult(allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf, stateObjMap.get((Long) place)),
                     row, retClass);
         } catch (Exception e) {
@@ -212,6 +249,7 @@ public class UdafExecutor extends BaseExecutor {
                     case UDAF_CREATE_FUNCTION:
                     case UDAF_MERGE_FUNCTION:
                     case UDAF_SERIALIZE_FUNCTION:
+                    case UDAF_RESET_FUNCTION:
                     case UDAF_DESERIALIZE_FUNCTION: {
                         allMethods.put(methods[idx].getName(), methods[idx]);
                         break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org