You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/08 01:28:15 UTC

[rocketmq-streams] branch develop updated: fix(state) fix bug: npl when rebalance

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/develop by this push:
     new 036f92ec fix(state) fix bug: npl when rebalance
     new 0a735fb8 Merge pull request #288 from ni-ze/develop
036f92ec is described below

commit 036f92ec909632d566215eb2918e07a5e60e5fdb
Author: 维章 <ni...@alibaba-inc.com>
AuthorDate: Thu May 4 10:15:58 2023 +0800

    fix(state) fix bug: npl when rebalance
---
 .../org/apache/rocketmq/streams/core/state/AbstractStore.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
index e7aab918..7b593348 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
@@ -145,12 +145,12 @@ public abstract class AbstractStore {
         }
 
         public Set<byte[]> getInCalculating(String stateTopicQueue) {
-            return calculating.get(stateTopicQueue);
+            return calculating.getOrDefault(stateTopicQueue, new HashSet<>());
         }
 
         public Set<byte[]> getAll(String stateTopicQueue) {
-            Set<byte[]> calculating = this.calculating.get(stateTopicQueue);
-            Set<byte[]> recover = this.recover.get(stateTopicQueue);
+            Set<byte[]> calculating = this.calculating.getOrDefault(stateTopicQueue, new HashSet<>());
+            Set<byte[]> recover = this.recover.getOrDefault(stateTopicQueue, new HashSet<>());
 
             Set<byte[]> result = new HashSet<>();
             result.addAll(calculating);
@@ -162,7 +162,7 @@ public abstract class AbstractStore {
 
         public String whichStateTopicQueueBelongTo(byte[] key) {
             for (String uniqueQueue : recover.keySet()) {
-                for (byte[] tempKeyByte : recover.get(uniqueQueue)) {
+                for (byte[] tempKeyByte : recover.getOrDefault(uniqueQueue, new HashSet<>())) {
                     if (Arrays.equals(tempKeyByte, key)) {
                         return uniqueQueue;
                     }
@@ -170,7 +170,7 @@ public abstract class AbstractStore {
             }
 
             for (String uniqueQueue : calculating.keySet()) {
-                for (byte[] tempKeyByte : calculating.get(uniqueQueue)) {
+                for (byte[] tempKeyByte : calculating.getOrDefault(uniqueQueue, new HashSet<>())) {
                     if (Arrays.equals(tempKeyByte, key)) {
                         return uniqueQueue;
                     }