You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 02:37:27 UTC
[rocketmq-flink] branch main updated: [ISSUE #76] Fix bug when the job restore from ck (#77)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 422909f [ISSUE #76] Fix bug when the job restore from ck (#77)
422909f is described below
commit 422909f352808b15f9eef1e699af0bd0fff17a88
Author: 高思伟 <48...@users.noreply.github.com>
AuthorDate: Wed Dec 7 10:37:22 2022 +0800
[ISSUE #76] Fix bug when the job restore from ck (#77)
Fix bug when the job restore from ck
Co-authored-by: 高思伟 <si...@amh-group.com>
---
.../apache/rocketmq/flink/legacy/RocketMQSourceFunction.java | 12 +++++-------
.../legacy/sourceFunction/RocketMQSourceFunctionTest.java | 3 ++-
2 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a1..fa57839 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -176,10 +176,6 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
-
- // use restoredOffsets to init offset table.
- initOffsetTableFromRestoredOffsets();
-
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
@@ -250,7 +246,9 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
- if (!restored) {
+ if (restored) {
+ initOffsetTableFromRestoredOffsets(messageQueues);
+ } else {
initOffsets(messageQueues);
}
}
@@ -539,11 +537,11 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
}
}
- public void initOffsetTableFromRestoredOffsets() {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
- if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+ if (messageQueues.contains(mq)) {
offsetTable.put(mq, offset);
}
});
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 8b7b44f..6ef73e6 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.flink.legacy.common.serialization.SimpleStringDeseria
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -78,7 +79,7 @@ public class RocketMQSourceFunctionTest {
map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
setFieldValue(source, "restoredOffsets", map);
setFieldValue(source, "offsetTable", new ConcurrentHashMap<>());
- source.initOffsetTableFromRestoredOffsets();
+ source.initOffsetTableFromRestoredOffsets(new ArrayList<>(map.keySet()));
Map<MessageQueue, Long> offsetTable = (Map) getFieldValue(source, "offsetTable");
for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
assertEquals(offsetTable.containsKey(entry.getKey()), true);