You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/10/09 02:53:44 UTC

[GitHub] [rocketmq-flink] SteNicholas commented on a diff in pull request #62: [features] Add rich initialization modes

SteNicholas commented on code in PR #62:
URL: https://github.com/apache/rocketmq-flink/pull/62#discussion_r990722626


##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class `org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` |
 | consumer.offset.persist.interval | auto commit offset interval      |    5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 |
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were not found      |    10 |
 
+### Consumer From Where

Review Comment:
   ```suggestion
   ### Consumer Strategy
   ```



##########
pom.xml:
##########
@@ -38,6 +38,8 @@
         <commons-lang.version>2.6</commons-lang.version>
         <spotless.version>2.4.2</spotless.version>
         <jaxb-api.version>2.3.1</jaxb-api.version>
+        <!-- rocketmq-schema-registry need to compile by yourself for the time being -->

Review Comment:
   Removes the comments for rocketmq-schema-registry maven dependencies. The rocketmq-schema-registry is deploying to the maven repository.



##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class `org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` |
 | consumer.offset.persist.interval | auto commit offset interval      |    5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 |
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                                                  |
+| ----------------------------------- | ------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after restart with no state |
+| StartFromLatest                     | consume from the latest offset after restart with no state   |
+| StartFromTimeStamp                  | consume from the closest timestamp of data in each broker's queue |
+| StartFromGroupOffsets with LATEST   | If broker has the committed offset then consume from the next else consume from the latest offset |
+| StartFromGroupOffsets with EARLIEST | If broker has the committed offset ,consume from the next ,otherwise consume from the earlist offset.It's useful when server  expand  broker |
+| StartFromSpecificOffsets            | consumer from the specificOffsets in broker's queues.Group offsets will be returned from those broker's queues whose didn't be specified |
+
+**Attention**
+
+Only if flink job start with none state ,those policies is effective.If job recover from checkpoint,offset would init from the stored data.

Review Comment:
   ```suggestion
   Only if Flink job starts with none state, these strategies are effective. If the job recovers from the checkpoint, the offset would intialize from the stored data.
   ```



##########
README.md:
##########
@@ -123,13 +128,40 @@ The following configurations are all from the class `org.apache.rocketmq.flink.l
 | consumer.group | consumer group *Required*     |    null |
 | consumer.topic | consumer topic *Required*       |    null |
 | consumer.tag | consumer topic tag      |    * |
-| consumer.offset.reset.to | what to do when there is no initial offset on the server      |   latest/earliest/timestamp |
-| consumer.offset.from.timestamp | the timestamp when `consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` |
 | consumer.offset.persist.interval | auto commit offset interval      |    5000 |
 | consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 |
 | consumer.batch.size | consumer messages batch size      |    32 |
 | consumer.delay.when.message.not.found | the delay time when messages were not found      |    10 |
 
+### Consumer From Where
+
+```java
+RocketMQSourceFunction<String> source = new RocketMQSourceFunction<>(
+        new SimpleStringDeserializationSchema(), props);
+HashMap<MessageQueue, Long> brokerMap = new HashMap<>();
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-a", 1), 201L);
+brokerMap.put(new MessageQueue("tp_driver_tag_sync_back", "broker-c", 3), 123L);
+source.setStartFromSpecificOffsets(brokerMap);
+```
+RocketMQSourceFunction offer five initialization policies 
+* setStartFromEarliest
+* setStartFromLatest
+* setStartFromTimeStamp with timestamp
+* setStartFromGroupOffsets with `OffsetResetStrategy`
+* setStartFromSpecificOffsets
+
+| STRATEGY                            | DESCRIPTION                                                  |
+| ----------------------------------- | ------------------------------------------------------------ |
+| StartFromEarliest                   | consume from the earliest offset after restart with no state |

Review Comment:
   It's better to introduce the strategy enum for these strategies.



##########
src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java:
##########
@@ -214,22 +227,35 @@ public void open(Configuration parameters) throws Exception {
                 getRuntimeContext()
                         .getMetricGroup()
                         .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
-    }
 
-    @Override
-    public void run(SourceContext context) throws Exception {
-        String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
-        String tag =
-                props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
-        int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, fetchDelay);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, emitDelay);
 
         final RuntimeContext ctx = getRuntimeContext();
         // The lock that guarantees that record emission and state updates are atomic,
         // from the view of taking a checkpoint.
         int taskNumber = ctx.getNumberOfParallelSubtasks();
         int taskIndex = ctx.getIndexOfThisSubtask();
         log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", taskNumber, taskIndex);
+        Collection<MessageQueue> totalQueues = consumer.fetchSubscribeMessageQueues(topic);
+        messageQueues =
+                RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
+        // if job recover from state,The state already contains offsets of last commit.

Review Comment:
   ```suggestion
           // If the job recovers from the state, the state has already contained the offsets of last commit.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org