You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:45 UTC
[rocketmq-flink] 32/33: RocketMQSource improves the message consume
of RocketMQPartitionSplitReader (#791)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit aae7134f56c38963711488331595ac1888d28558
Author: SteNicholas <pr...@163.com>
AuthorDate: Tue Aug 24 15:48:20 2021 +0800
RocketMQSource improves the message consume of RocketMQPartitionSplitReader (#791)
---
.../flink/source/reader/RocketMQPartitionSplitReader.java | 10 +++++++---
.../rocketmq/flink/source/split/RocketMQPartitionSplit.java | 7 ++++---
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 41fbbea..1846114 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -133,7 +133,7 @@ public class RocketMQPartitionSplitReader<T>
? consumer.searchOffset(messageQueue, startTime)
: startOffset;
} catch (MQClientException e) {
- LOG.error(
+ LOG.warn(
String.format(
"Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.",
messageQueue.getTopic(),
@@ -159,13 +159,13 @@ public class RocketMQPartitionSplitReader<T>
return recordsBySplits;
}
pullResult =
- consumer.pullBlockIfNotFound(
+ consumer.pull(
messageQueue, tag, messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK);
} catch (MQClientException
| RemotingException
| MQBrokerException
| InterruptedException e) {
- LOG.error(
+ LOG.warn(
String.format(
"Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d] exception.",
messageQueue.getTopic(),
@@ -222,6 +222,10 @@ public class RocketMQPartitionSplitReader<T>
}
}
recordsBySplits.prepareForRead();
+ LOG.debug(
+ String.format(
+ "Fetch record splits for MetaQ subscribe message queues of topic[%s].",
+ topic));
return recordsBySplits;
}
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
index 9bda60f..5717767 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
@@ -73,13 +73,13 @@ public class RocketMQPartitionSplit implements SourceSplit {
@Override
public String toString() {
return String.format(
- "[Topic: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
- topic, partition, startingOffset, stoppingTimestamp);
+ "[Topic: %s, Broker: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
+ topic, broker, partition, startingOffset, stoppingTimestamp);
}
@Override
public int hashCode() {
- return Objects.hash(topic, partition, startingOffset, stoppingTimestamp);
+ return Objects.hash(topic, broker, partition, startingOffset, stoppingTimestamp);
}
@Override
@@ -89,6 +89,7 @@ public class RocketMQPartitionSplit implements SourceSplit {
}
RocketMQPartitionSplit other = (RocketMQPartitionSplit) obj;
return topic.equals(other.topic)
+ && broker.equals(other.broker)
&& partition == other.partition
&& startingOffset == other.startingOffset
&& stoppingTimestamp == other.stoppingTimestamp;