You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2022/04/11 01:45:07 UTC

[rocketmq-streams] branch main updated: bugfix list - adjust test code in WindowTrigger in case the window which size unit is minute - adjust register logic in session window operator - fix issue of window value can't be delete after fired

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e093c0  bugfix list - adjust test code in WindowTrigger in case the window which size unit is minute - adjust register logic in session window operator - fix issue of window value can't be delete after fired
     new 23bb048  Merge pull request #125 from speak2me/bugfix_20220113
8e093c0 is described below

commit 8e093c0d3a0ac44ab8d5f04b78e7b26d2c6719f1
Author: write2me <wr...@vip.qq.com>
AuthorDate: Thu Jan 13 21:12:29 2022 +0800

    bugfix list
    - adjust test code in WindowTrigger in case the window which size unit is minute
    - adjust register logic in session window operator
    - fix issue of window value can't be delete after fired
---
 .../rocketmq/streams/common/utils/SQLUtil.java     |  2 +-
 .../window/offset/WindowMaxValueProcessor.java     |  2 +-
 .../window/operator/impl/SessionOperator.java      | 33 ++++++++--------------
 .../streams/window/trigger/WindowTrigger.java      | 10 ++++---
 4 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
index b1676b9..7bce3cb 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
@@ -577,7 +577,7 @@ public class SQLUtil {
         buffer.append(" ");
         for (int index = 0; index < keywordList.size(); index++) {
             Pair<String, String> pair = keywordList.get(index);
-            buffer.append(pair.getKey() + " like '" + pair.getValue() + "'");
+            buffer.append(pair.getKey() + " like '" + pair.getValue() + "%'");
             if (index != (keywordList.size() - 1)) {
                 buffer.append(" or ");
             }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index 4aa86ae..cf09bf4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -127,7 +127,7 @@ public class WindowMaxValueProcessor {
         }
 
         String keyPrefix = MapKeyUtil.createKey(name, splitId);
-        String sql = "select * from " + ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" + name + "%' and partition like '%" + splitId + "%'";
+        String sql="select * from "+ ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like '"+keyPrefix+"%'";
         List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null, WindowMaxValue.class);
         if (windowMaxValues == null || windowMaxValues.size() == 0) {
             return result;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
index 7b27e12..e9aa617 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java
@@ -386,13 +386,6 @@ public class SessionOperator extends WindowOperator {
             //get iterator sorted by fire time
             WindowBaseValueIterator<WindowValue> it = storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId, windowInstance.createWindowInstanceId(), null, getWindowBaseValueClass());
             //
-            if (queueId2Offset != null) {
-                String offset = queueId2Offset.get(queueId);
-                if (StringUtil.isNotEmpty(offset)) {
-                    it.setPartitionNum(Long.valueOf(offset));
-                }
-            }
-            //
             Long currentFireTime = DateUtil.parse(windowInstance.getFireTime(), SESSION_DATETIME_PATTERN).getTime();
             Long nextFireTime = currentFireTime + 1000 * 60 * 1;
             List<WindowValue> toFireValueList = new ArrayList<>();
@@ -412,8 +405,18 @@ public class SessionOperator extends WindowOperator {
                     }
                 }
             }
-            doFire(queueId, windowInstance, toFireValueList, currentFireTime, nextFireTime);
+            doFire(queueId, windowInstance, toFireValueList);
             //
+            if (!nextFireTime.equals(currentFireTime)) {
+                String instanceId = windowInstance.createWindowInstanceId();
+                WindowInstance existedWindowInstance = searchWindowInstance(instanceId);
+                if (existedWindowInstance != null) {
+                    existedWindowInstance.setFireTime(DateUtil.format(new Date(nextFireTime)));
+                    windowFireSource.registFireWindowInstanceIfNotExist(windowInstance, this);
+                } else {
+                    LOG.error("window instance lost, queueId: " + queueId + " ,fire time" + windowInstance.getFireTime());
+                }
+            }
             return toFireValueList.size();
         }
 
@@ -432,25 +435,13 @@ public class SessionOperator extends WindowOperator {
         return false;
     }
 
-    private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList, Long currentFireTime,
-        Long nextFireTime) {
+    private void doFire(String queueId, WindowInstance instance, List<WindowValue> valueList) {
         if (CollectionUtil.isEmpty(valueList)) {
             return;
         }
         valueList.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum));
         sendFireMessage(valueList, queueId);
         clearWindowValues(valueList, queueId, instance);
-        //
-        if (!nextFireTime.equals(currentFireTime)) {
-            String instanceId = instance.createWindowInstanceId();
-            WindowInstance existedWindowInstance = searchWindowInstance(instanceId);
-            if (existedWindowInstance != null) {
-                existedWindowInstance.setFireTime(DateUtil.format(new Date(nextFireTime)));
-                windowFireSource.registFireWindowInstanceIfNotExist(instance, this);
-            } else {
-                LOG.error("window instance lost, queueId: " + queueId + " ,fire time" + instance.getFireTime());
-            }
-        }
     }
 
     /**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
index 3b9410d..a872f3a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java
@@ -248,10 +248,12 @@ public class WindowTrigger extends AbstractSupportShuffleSource implements IStre
         if (eventTimeLastUpdateTime == null) {
             return new FireResult();
         }
-        int gap = (int) (System.currentTimeMillis() - eventTimeLastUpdateTime);
-        if (window.getMsgMaxGapSecond() != null && gap > window.getMsgMaxGapSecond() * 1000) {
-            LOG.warn("the fire reason is exceed the gap " + gap + " window instance id is " + windowInstanceTriggerId);
-            return new FireResult(true, 1);
+        if (isTest) {
+            int gap = (int) (System.currentTimeMillis() - eventTimeLastUpdateTime);
+            if (window.getMsgMaxGapSecond() != null && gap > window.getMsgMaxGapSecond() * 1000) {
+                LOG.warn("the fire reason is exceed the gap " + gap + " window instance id is " + windowInstanceTriggerId);
+                return new FireResult(true, 1);
+            }
         }
         return new FireResult();
     }