You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/26 01:26:48 UTC
[doris] branch branch-1.2-lts updated: [cherry-pick](udaf) pick some java-udaf from master (#21012)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new eb3d9236dd [cherry-pick](udaf) pick some java-udaf from master (#21012)
eb3d9236dd is described below
commit eb3d9236dd6c0697907200a8bc652ee101ed7b71
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Mon Jun 26 09:26:39 2023 +0800
[cherry-pick](udaf) pick some java-udaf from master (#21012)
---
.../aggregate_function_java_udaf.h | 27 ++++++++++--
.../java/org/apache/doris/udf/BaseExecutor.java | 1 +
.../java/org/apache/doris/udf/UdafExecutor.java | 48 +++++++++++++++++++---
3 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 2ea485bd7a..35e555ab0b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -44,6 +44,7 @@ const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(ZJJ)V";
const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(J)[B";
const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(J[B)V";
const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(JJ)Z";
+const char* UDAF_EXECUTOR_RESET_SIGNATURE = "(J)V";
// Calling Java method about those signature means: "(argument-types)return-type"
// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
@@ -183,6 +184,13 @@ public:
return JniUtil::GetJniExceptionMsg(env);
}
+ Status reset(int64_t place) {
+ JNIEnv* env = nullptr;
+ RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf reset function");
+ env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_reset_id, place);
+ return JniUtil::GetJniExceptionMsg(env);
+ }
+
void read(BufferReadable& buf) { read_binary(serialize_data, buf); }
Status destroy() {
@@ -258,6 +266,7 @@ private:
RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+ RETURN_IF_ERROR(register_id("reset", UDAF_EXECUTOR_RESET_SIGNATURE, executor_reset_id));
RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
RETURN_IF_ERROR(
@@ -279,6 +288,7 @@ private:
jmethodID executor_add_id;
jmethodID executor_merge_id;
jmethodID executor_serialize_id;
+ jmethodID executor_reset_id;
jmethodID executor_result_id;
jmethodID executor_close_id;
jmethodID executor_destroy_id;
@@ -367,10 +377,19 @@ public:
this->data(_exec_place).add(places_address, true, columns, 0, batch_size, argument_types);
}
- // TODO: reset function should be implement also in struct data
- void reset(AggregateDataPtr /*place*/) const override {
- LOG(WARNING) << " shouldn't going reset function, there maybe some error about function "
- << _fn.name.function_name;
+ void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
+ int64_t frame_end, AggregateDataPtr place, const IColumn** columns,
+ Arena* /*arena*/) const override {
+ frame_start = std::max<int64_t>(frame_start, partition_start);
+ frame_end = std::min<int64_t>(frame_end, partition_end);
+ int64_t places_address[1];
+ places_address[0] = reinterpret_cast<int64_t>(place);
+ this->data(_exec_place)
+ .add(places_address, true, columns, frame_start, frame_end, argument_types);
+ }
+
+ void reset(AggregateDataPtr place) const override {
+ this->data(_exec_place).reset(reinterpret_cast<int64_t>(place));
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 9e481b521a..16d85211b1 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -42,6 +42,7 @@ public abstract class BaseExecutor {
public static final String UDAF_CREATE_FUNCTION = "create";
public static final String UDAF_DESTROY_FUNCTION = "destroy";
public static final String UDAF_ADD_FUNCTION = "add";
+ public static final String UDAF_RESET_FUNCTION = "reset";
public static final String UDAF_SERIALIZE_FUNCTION = "serialize";
public static final String UDAF_DESERIALIZE_FUNCTION = "deserialize";
public static final String UDAF_MERGE_FUNCTION = "merge";
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
index 4f88fa967e..fba0dfd361 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
@@ -71,10 +71,21 @@ public class UdafExecutor extends BaseExecutor {
try {
long idx = rowStart;
do {
- Long curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr) + 8L * idx);
+ Long curPlace = null;
+ if (isSinglePlace) {
+ curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr));
+ } else {
+ curPlace = UdfUtils.UNSAFE.getLong(null, UdfUtils.UNSAFE.getLong(null, inputPlacesPtr) + 8L * idx);
+ }
Object[] inputArgs = new Object[argTypes.length + 1];
- stateObjMap.putIfAbsent(curPlace, createAggState());
- inputArgs[0] = stateObjMap.get(curPlace);
+ Object state = stateObjMap.get(curPlace);
+ if (state != null) {
+ inputArgs[0] = state;
+ } else {
+ Object newState = createAggState();
+ stateObjMap.put(curPlace, newState);
+ inputArgs[0] = newState;
+ }
do {
Object[] inputObjects = allocateInputObjects(idx, 1);
for (int i = 0; i < argTypes.length; ++i) {
@@ -130,6 +141,23 @@ public class UdafExecutor extends BaseExecutor {
}
}
+ /*
+ * invoke reset function and reset the state to init.
+ */
+ public void reset(long place) throws UdfRuntimeException {
+ try {
+ Object[] args = new Object[1];
+ args[0] = stateObjMap.get((Long) place);
+ if (args[0] == null) {
+ return;
+ }
+ allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args);
+ } catch (Exception e) {
+ LOG.warn("invoke reset function meet some error: " + e.getCause().toString());
+ throw new UdfRuntimeException("UDAF failed to reset: ", e);
+ }
+ }
+
/**
* invoke merge function and it's have done deserialze.
* here call deserialize first, and call merge.
@@ -143,8 +171,14 @@ public class UdafExecutor extends BaseExecutor {
allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf, args);
args[1] = args[0];
Long curPlace = place;
- stateObjMap.putIfAbsent(curPlace, createAggState());
- args[0] = stateObjMap.get(curPlace);
+ Object state = stateObjMap.get(curPlace);
+ if (state != null) {
+ args[0] = state;
+ } else {
+ Object newState = createAggState();
+ stateObjMap.put(curPlace, newState);
+ args[0] = newState;
+ }
allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args);
} catch (Exception e) {
throw new UdfRuntimeException("UDAF failed to merge: ", e);
@@ -156,6 +190,9 @@ public class UdafExecutor extends BaseExecutor {
*/
public boolean getValue(long row, long place) throws UdfRuntimeException {
try {
+ if (stateObjMap.get(place) == null) {
+ stateObjMap.put(place, createAggState());
+ }
return storeUdfResult(allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf, stateObjMap.get((Long) place)),
row, retClass);
} catch (Exception e) {
@@ -212,6 +249,7 @@ public class UdafExecutor extends BaseExecutor {
case UDAF_CREATE_FUNCTION:
case UDAF_MERGE_FUNCTION:
case UDAF_SERIALIZE_FUNCTION:
+ case UDAF_RESET_FUNCTION:
case UDAF_DESERIALIZE_FUNCTION: {
allMethods.put(methods[idx].getName(), methods[idx]);
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org