You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:45 UTC

[rocketmq-flink] 32/33: RocketMQSource improves the message consume of RocketMQPartitionSplitReader (#791)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit aae7134f56c38963711488331595ac1888d28558
Author: SteNicholas <pr...@163.com>
AuthorDate: Tue Aug 24 15:48:20 2021 +0800

    RocketMQSource improves the message consume of RocketMQPartitionSplitReader (#791)
---
 .../flink/source/reader/RocketMQPartitionSplitReader.java      | 10 +++++++---
 .../rocketmq/flink/source/split/RocketMQPartitionSplit.java    |  7 ++++---
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 41fbbea..1846114 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -133,7 +133,7 @@ public class RocketMQPartitionSplitReader<T>
                                         ? consumer.searchOffset(messageQueue, startTime)
                                         : startOffset;
                     } catch (MQClientException e) {
-                        LOG.error(
+                        LOG.warn(
                                 String.format(
                                         "Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.",
                                         messageQueue.getTopic(),
@@ -159,13 +159,13 @@ public class RocketMQPartitionSplitReader<T>
                         return recordsBySplits;
                     }
                     pullResult =
-                            consumer.pullBlockIfNotFound(
+                            consumer.pull(
                                     messageQueue, tag, messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK);
                 } catch (MQClientException
                         | RemotingException
                         | MQBrokerException
                         | InterruptedException e) {
-                    LOG.error(
+                    LOG.warn(
                             String.format(
                                     "Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d] exception.",
                                     messageQueue.getTopic(),
@@ -222,6 +222,10 @@ public class RocketMQPartitionSplitReader<T>
             }
         }
         recordsBySplits.prepareForRead();
+        LOG.debug(
+                String.format(
+                        "Fetch record splits for MetaQ subscribe message queues of topic[%s].",
+                        topic));
         return recordsBySplits;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
index 9bda60f..5717767 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java
@@ -73,13 +73,13 @@ public class RocketMQPartitionSplit implements SourceSplit {
     @Override
     public String toString() {
         return String.format(
-                "[Topic: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
-                topic, partition, startingOffset, stoppingTimestamp);
+                "[Topic: %s, Broker: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]",
+                topic, broker, partition, startingOffset, stoppingTimestamp);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topic, partition, startingOffset, stoppingTimestamp);
+        return Objects.hash(topic, broker, partition, startingOffset, stoppingTimestamp);
     }
 
     @Override
@@ -89,6 +89,7 @@ public class RocketMQPartitionSplit implements SourceSplit {
         }
         RocketMQPartitionSplit other = (RocketMQPartitionSplit) obj;
         return topic.equals(other.topic)
+                && broker.equals(other.broker)
                 && partition == other.partition
                 && startingOffset == other.startingOffset
                 && stoppingTimestamp == other.stoppingTimestamp;