You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2022/04/11 01:45:07 UTC
[rocketmq-streams] branch main updated: bugfix list - adjust test code in WindowTrigger in case the window which size unit is minute - adjust register logic in session window operator - fix issue of window value can't be delete after fired
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 8e093c0 bugfix list - adjust test code in WindowTrigger in case the window which size unit is minute - adjust register logic in session window operator - fix issue of window value can't be delete after fired
new 23bb048 Merge pull request #125 from speak2me/bugfix_20220113
8e093c0 is described below
commit 8e093c0d3a0ac44ab8d5f04b78e7b26d2c6719f1
Author: write2me <wr...@vip.qq.com>
AuthorDate: Thu Jan 13 21:12:29 2022 +0800
bugfix list
- adjust test code in WindowTrigger in case the window which size unit is minute
- adjust register logic in session window operator
- fix issue of window value can't be delete after fired
---
.../rocketmq/streams/common/utils/SQLUtil.java | 2 +-
.../window/offset/WindowMaxValueProcessor.java | 2 +-
.../window/operator/impl/SessionOperator.java | 33 ++++++++--------------
.../streams/window/trigger/WindowTrigger.java | 10 ++++---
4 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
index b1676b9..7bce3cb 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
@@ -577,7 +577,7 @@ public class SQLUtil {
buffer.append(" ");
for (int index = 0; index < keywordList.size(); index++) {
Pair<String, String> pair = keywordList.get(index);
- buffer.append(pair.getKey() + " like '" + pair.getValue() + "'");
+ buffer.append(pair.getKey() + " like '" + pair.getValue() + "%'");
if (index != (keywordList.size() - 1)) {
buffer.append(" or ");
}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index 4aa86ae..cf09bf4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -127,7 +127,7 @@ public class WindowMaxValueProcessor {
}
String keyPrefix = MapKeyUtil.createKey(name, splitId);
- String sql = "select * from " + ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" + name + "%' and partition like '%" + splitId + "%'";
+ String sql="select * from "+ ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like '"+keyPrefix+"%'";
List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null, WindowMaxValue.class);
if (windowMaxValues == null || windowMaxValues.size() == 0) {
return result;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 7b27e12..e9aa617 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -386,13 +386,6 @@ public class SessionOperator extends WindowOperator {
//get iterator sorted by fire time
WindowBaseValueIterator<WindowValue> it = storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId, windowInstance.createWindowInstanceId(), null, getWindowBaseValueClass());
//
- if (queueId2Offset != null) {
- String offset = queueId2Offset.get(queueId);
- if (StringUtil.isNotEmpty(offset)) {
- it.setPartitionNum(Long.valueOf(offset));
- }
- }
- //
Long currentFireTime = DateUtil.parse(windowInstance.getFireTime(), SESSION_DATETIME_PATTERN).getTime();
Long nextFireTime = currentFireTime + 1000 * 60 * 1;
List<WindowValue> toFireValueList = new ArrayList<>();
@@ -412,8 +405,18 @@ public class SessionOperator extends WindowOperator {
}
}
}
- doFire(queueId, windowInstance, toFireValueList, currentFireTime, nextFireTime);
+ doFire(queueId, windowInstance, toFireValueList);
//
+ if (!nextFireTime.equals(currentFireTime)) {
+ String instanceId = windowInstance.createWindowInstanceId();
+ WindowInstance existedWindowInstance = searchWindowInstance(instanceId);
+ if (existedWindowInstance != null) {
+ existedWindowInstance.setFireTime(DateUtil.format(new Date(nextFireTime)));
+ windowFireSource.registFireWindowInstanceIfNotExist(windowInstance, this);
+ } else {
+ LOG.error("window instance lost, queueId: " + queueId + " ,fire time" + windowInstance.getFireTime());
+ }
+ }
return toFireValueList.size();
}
@@ -432,25 +435,13 @@ public class SessionOperator extends WindowOperator {
return false;
}
- private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList, Long currentFireTime,
- Long nextFireTime) {
+ private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList) {
if (CollectionUtil.isEmpty(valueList)) {
return;
}
valueList.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum));
sendFireMessage(valueList, queueId);
clearWindowValues(valueList, queueId, instance);
- //
- if (!nextFireTime.equals(currentFireTime)) {
- String instanceId = instance.createWindowInstanceId();
- WindowInstance existedWindowInstance = searchWindowInstance(instanceId);
- if (existedWindowInstance != null) {
- existedWindowInstance.setFireTime(DateUtil.format(new Date(nextFireTime)));
- windowFireSource.registFireWindowInstanceIfNotExist(instance, this);
- } else {
- LOG.error("window instance lost, queueId: " + queueId + " ,fire time" + instance.getFireTime());
- }
- }
}
/**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
index 3b9410d..a872f3a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
@@ -248,10 +248,12 @@ public class WindowTrigger extends AbstractSupportShuffleSource implements IStre
if (eventTimeLastUpdateTime == null) {
return new FireResult();
}
- int gap = (int) (System.currentTimeMillis() - eventTimeLastUpdateTime);
- if (window.getMsgMaxGapSecond() != null && gap > window.getMsgMaxGapSecond() * 1000) {
- LOG.warn("the fire reason is exceed the gap " + gap + " window instance id is " + windowInstanceTriggerId);
- return new FireResult(true, 1);
+ if (isTest) {
+ int gap = (int) (System.currentTimeMillis() - eventTimeLastUpdateTime);
+ if (window.getMsgMaxGapSecond() != null && gap > window.getMsgMaxGapSecond() * 1000) {
+ LOG.warn("the fire reason is exceed the gap " + gap + " window instance id is " + windowInstanceTriggerId);
+ return new FireResult(true, 1);
+ }
}
return new FireResult();
}