You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:38 UTC
[rocketmq-flink] 25/33: [rocketmq-connector-flink] rebalance cause
offset rollback to long time ago (#672)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit da92a64a12f4de3b352f29182a4565cc4ac23ae4
Author: JerryTaoTao <27...@qq.com>
AuthorDate: Thu Jan 21 19:19:19 2021 +0800
[rocketmq-connector-flink] rebalance cause offset rollback to long time ago (#672)
Co-authored-by: hzyuemeng1 <hz...@corp.netease.com>
---
.../org/apache/rocketmq/flink/RocketMQSource.java | 20 ++++++++++++++++----
1 file changed, 16 insertions(+), 4 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 35c5122..72783a8 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
@@ -120,6 +121,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
+
+ //use restoredOffsets to init offset table.
+ initOffsetTableFromRestoredOffsets();
+
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
@@ -252,13 +257,10 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
Long offset = offsetTable.get(mq);
// restoredOffsets(unionOffsetStates) is the restored global union state;
// should only snapshot mqs that actually belong to us
- if (restored && offset == null) {
- offset = restoredOffsets.get(mq);
- }
if (offset == null) {
// fetchConsumeOffset from broker
offset = consumer.fetchConsumeOffset(mq, false);
- if (offset < 0) {
+ if (!restored || offset < 0) {
String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
switch (initialOffset) {
case CONSUMER_OFFSET_EARLIEST:
@@ -318,6 +320,16 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
}
}
+ public void initOffsetTableFromRestoredOffsets() {
+ Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
+ restoredOffsets.forEach((mq, offset) -> {
+ if (!offsetTable.containsKey(mq) || offsetTable.get(mq) < offset) {
+ offsetTable.put(mq, offset);
+ }
+ });
+ log.info("init offset table from restoredOffsets successful.", offsetTable);
+ }
+
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when a snapshot for a checkpoint is requested