You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/05/08 01:28:15 UTC
[rocketmq-streams] branch develop updated: fix(state) fix bug: npl when rebalance
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/develop by this push:
new 036f92ec fix(state) fix bug: npl when rebalance
new 0a735fb8 Merge pull request #288 from ni-ze/develop
036f92ec is described below
commit 036f92ec909632d566215eb2918e07a5e60e5fdb
Author: 维章 <ni...@alibaba-inc.com>
AuthorDate: Thu May 4 10:15:58 2023 +0800
fix(state) fix bug: npl when rebalance
---
.../org/apache/rocketmq/streams/core/state/AbstractStore.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
index e7aab918..7b593348 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java
@@ -145,12 +145,12 @@ public abstract class AbstractStore {
}
public Set<byte[]> getInCalculating(String stateTopicQueue) {
- return calculating.get(stateTopicQueue);
+ return calculating.getOrDefault(stateTopicQueue, new HashSet<>());
}
public Set<byte[]> getAll(String stateTopicQueue) {
- Set<byte[]> calculating = this.calculating.get(stateTopicQueue);
- Set<byte[]> recover = this.recover.get(stateTopicQueue);
+ Set<byte[]> calculating = this.calculating.getOrDefault(stateTopicQueue, new HashSet<>());
+ Set<byte[]> recover = this.recover.getOrDefault(stateTopicQueue, new HashSet<>());
Set<byte[]> result = new HashSet<>();
result.addAll(calculating);
@@ -162,7 +162,7 @@ public abstract class AbstractStore {
public String whichStateTopicQueueBelongTo(byte[] key) {
for (String uniqueQueue : recover.keySet()) {
- for (byte[] tempKeyByte : recover.get(uniqueQueue)) {
+ for (byte[] tempKeyByte : recover.getOrDefault(uniqueQueue, new HashSet<>())) {
if (Arrays.equals(tempKeyByte, key)) {
return uniqueQueue;
}
@@ -170,7 +170,7 @@ public abstract class AbstractStore {
}
for (String uniqueQueue : calculating.keySet()) {
- for (byte[] tempKeyByte : calculating.get(uniqueQueue)) {
+ for (byte[] tempKeyByte : calculating.getOrDefault(uniqueQueue, new HashSet<>())) {
if (Arrays.equals(tempKeyByte, key)) {
return uniqueQueue;
}