You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/05 09:14:32 UTC

[GitHub] [rocketmq-flink] wangyongchun8888 opened a new issue #19: Direct new DefaultMQPullConsumer, cann't update to broker when notifyCheckpointComplete function

wangyongchun8888 opened a new issue #19:
URL: https://github.com/apache/rocketmq-flink/issues/19


   package com.zto.route.flink.rocketmq;
   
   import com.zto.route.flink.rocketmq.common.util.MetricUtils;
   import com.zto.route.flink.rocketmq.common.util.RetryUtil;
   import com.zto.route.flink.rocketmq.common.watermark.WaterMarkForAll;
   import com.zto.route.flink.rocketmq.common.watermark.WaterMarkPerQueue;
   import org.apache.commons.lang3.Validate;
   import org.apache.flink.api.common.functions.RuntimeContext;
   import org.apache.flink.api.common.state.CheckpointListener;
   import org.apache.flink.api.common.state.ListState;
   import org.apache.flink.api.common.state.ListStateDescriptor;
   import org.apache.flink.api.common.typeinfo.TypeHint;
   import org.apache.flink.api.common.typeinfo.TypeInformation;
   import org.apache.flink.api.java.tuple.Tuple2;
   import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
   import org.apache.flink.configuration.Configuration;
   import org.apache.flink.metrics.Counter;
   import org.apache.flink.metrics.Meter;
   import org.apache.flink.metrics.MeterView;
   import org.apache.flink.metrics.SimpleCounter;
   import org.apache.flink.runtime.state.FunctionInitializationContext;
   import org.apache.flink.runtime.state.FunctionSnapshotContext;
   import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
   import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
   import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
   import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
   import org.apache.flink.util.Collector;
   import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
   import org.apache.rocketmq.client.exception.MQClientException;
   import org.apache.rocketmq.common.message.MessageExt;
   import org.apache.rocketmq.common.message.MessageQueue;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.lang.management.ManagementFactory;
   import java.util.*;
   import java.util.concurrent.*;
   
   import com.zto.route.flink.rocketmq.common.util.RocketMQUtils;
   
   public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
           implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
       private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);
       private static final long serialVersionUID = 1L;
       // state name
       private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
       ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
       private RunningChecker runningChecker;
       private volatile Object checkPointLock;
       private transient volatile boolean restored;
       private transient boolean enableCheckpoint;
       private RocketMQCollector rocketMQCollector;
       private List<MessageQueue> assignQueues;
       private ExecutorService executor;
       private DefaultLitePullConsumer consumer;
       private RocketMQDeserializationSchema<OUT> deserializer;
       private Properties props;
       private String topic;
       private String group;
       private Map<MessageQueue, Long> offsetTable = new ConcurrentHashMap<>();
       private ScheduledExecutorService timer;
       // watermark in source
       private WaterMarkPerQueue waterMarkPerQueue;
       private WaterMarkForAll waterMarkForAll;
       private Meter tpsMetric;
   
       public RocketMQSourceV3(RocketMQDeserializationSchema<OUT> deserializer, Properties props) {
           this.deserializer = deserializer;
           this.props = props;
       }
   
       @Override
       public void open(Configuration parameters) throws Exception {
           LOG.debug("source run ......");
   
           Validate.notEmpty(props, "Consumer properties can not be empty");
   
           this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
           this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
   
           Validate.notEmpty(topic, "Consumer topic can not be empty");
           Validate.notEmpty(group, "Consumer group can not be empty");
           this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
   
           runningChecker = new RunningChecker();
           runningChecker.setRunning(true);
   
           rocketMQCollector = new RocketMQCollector();
   
           waterMarkPerQueue = new WaterMarkPerQueue(5000);
   
           waterMarkForAll = new WaterMarkForAll(5000);
   
           Counter outputCounter = getRuntimeContext().getMetricGroup()
                   .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
           tpsMetric = getRuntimeContext().getMetricGroup()
                   .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
           final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                   .setDaemon(true).setNameFormat("rmq-pull-thread-%d").build();
           executor = Executors.newCachedThreadPool(threadFactory);
   
           timer = Executors.newSingleThreadScheduledExecutor();
           // 初始化consumer 并分配队列
           startConsumer();
       }
   
       @Override
       public void run(SourceContext<OUT> context) throws Exception {
           checkPointLock = context.getCheckpointLock();
           timer.scheduleAtFixedRate(() -> {
               context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
               context.emitWatermark(waterMarkForAll.getCurrentWatermark());
           }, 5, 5, TimeUnit.SECONDS);
   
           this.executor.execute(() -> RetryUtil.call(() -> {
   
               while (runningChecker.isRunning()) {
                   List<MessageExt> messages = consumer.poll();
                   for (MessageExt msg : messages) {
                       synchronized (checkPointLock) {
                           deserializer.deserialize(msg, rocketMQCollector);
                           // get record from collector and use sourceContext emit it to down task
                           Queue<OUT> records = rocketMQCollector.getRecords();
                           OUT record;
                           while ((record = records.poll()) != null) {
                               context.collectWithTimestamp(record, msg.getBornTimestamp());
                           }
                           // record offset to offset table
                           recordBrokerOffset(msg);
   
                           // update max eventTime per queue
                           waterMarkPerQueue.extractTimestamp(buildMessageQueue(msg), msg.getBornTimestamp());
                           waterMarkForAll.extractTimestamp(msg.getBornTimestamp());
                           tpsMetric.markEvent();
                       }
                   }
               }
               return true;
           }, "RuntimeException"));
   
           awaitTermination();
       }
   
       private void awaitTermination() throws InterruptedException {
           while (runningChecker.isRunning()) {
               Thread.sleep(50);
           }
       }
   
       @Override
       public void cancel() {
           LOG.debug("cancel ...");
           runningChecker.setRunning(false);
   
           if (consumer != null) {
               consumer.shutdown();
           }
   
           if (offsetTable != null) {
               offsetTable.clear();
           }
       }
   
       @Override
       public void notifyCheckpointComplete(long checkpointId) throws Exception {
           if (!consumer.isAutoCommit()) {
               LOG.info("commit consumer offset .......");
               consumer.commitSync();
           }
       }
   
   
       @Override
       public void snapshotState(FunctionSnapshotContext context) throws Exception {
           // called when a snapshot for a checkpoint is requested
           LOG.info("Snapshotting state {} ...", context.getCheckpointId());
           if (!runningChecker.isRunning()) {
               LOG.info("snapshotState() called on closed source; returning null.");
               return;
           }
   
           // Discovery topic Route change when snapshot
           RetryUtil.call(() -> {
               List<MessageQueue> newQueues = getAssignQueues();
               Collections.sort(newQueues);
               LOG.debug(getRuntimeContext().getIndexOfThisSubtask() + " Topic route is same.");
               if (!assignQueues.equals(newQueues)) {
                   throw new RuntimeException();
               }
               return true;
           }, "RuntimeException due to topic route changed");
   
           unionOffsetStates.clear();
           for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
               LOG.info("snapshot {}, offset {}", entry.getKey(), entry.getValue());
               unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
           }
       }
   
       @Override
       public void initializeState(FunctionInitializationContext context) throws Exception {
           LOG.info("initialize State ...");
   
           unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
                   OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
           })));
           this.restored = context.isRestored();
           if (restored) {
   
               unionOffsetStates.get().forEach(t -> offsetTable.put(t.f0, t.f1));
               LOG.info("Restore from state, {}", offsetTable);
           } else {
               LOG.info("No restored from state ......");
           }
       }
   
       @Override
       public TypeInformation<OUT> getProducedType() {
           return deserializer.getProducedType();
       }
   
       private void recordBrokerOffset(MessageExt message) {
           MessageQueue mq = buildMessageQueue(message);
           long queueOffset = message.getQueueOffset();
           offsetTable.put(mq, queueOffset);
           // 如果没有启用chk,并且没有启用自动提交,那么每次要提交offset
           if (!enableCheckpoint && !consumer.isAutoCommit()) {
               consumer.commitSync();
           }
       }
   
       private MessageQueue buildMessageQueue(MessageExt message) {
           String topic = message.getTopic();
           String brokerName = message.getBrokerName();
           int queueId = message.getQueueId();
           return new MessageQueue(topic, brokerName, queueId);
       }
   
       private void startConsumer() throws MQClientException {
           LOG.info("consumer start ");
           consumer = new DefaultLitePullConsumer(this.group, RocketMQConfig.buildAclRPCHook(props));
           String nameServers = props.getProperty(RocketMQConfig.NAME_SERVER_ADDR);
           Validate.notEmpty(nameServers);
           consumer.setNamesrvAddr(nameServers);
           consumer.setPollNameServerInterval(RocketMQUtils.getInteger(props,
                   RocketMQConfig.NAME_SERVER_POLL_INTERVAL, RocketMQConfig.DEFAULT_NAME_SERVER_POLL_INTERVAL));
           consumer.setHeartbeatBrokerInterval(RocketMQUtils.getInteger(props,
                   RocketMQConfig.BROKER_HEART_BEAT_INTERVAL, RocketMQConfig.DEFAULT_BROKER_HEART_BEAT_INTERVAL));
           String runtimeName = ManagementFactory.getRuntimeMXBean().getName();
           int indexOfThisSubTask = getRuntimeContext().getIndexOfThisSubtask();
           String instanceName = RocketMQUtils.getInstanceName(runtimeName, topic, group,
                   String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime()));
           consumer.setInstanceName(instanceName);
   //        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
   //        consumer.subscribe(this.topic, tag);
           boolean autoCommit = RocketMQUtils.getBoolean(props, RocketMQConfig.OFFSET_AUTO_COMMIT, false);
           consumer.setAutoCommit(autoCommit);
           consumer.start();
           consumer.assign(getAssignQueues());
           // 从offsetTable中移除不在本task 中分配的队列,做snapshot,并从恢复的offset中,seek到上次的offset
           removeUnAssignQueues();
   
           // 每个MessageQueue指定offset消费
           perQueueSeekToSpecialOffset();
       }
   
       private List<MessageQueue> getAssignQueues() throws MQClientException {
           final RuntimeContext ctx = getRuntimeContext();
           int taskNumber = ctx.getNumberOfParallelSubtasks();
           int taskIndex = ctx.getIndexOfThisSubtask();
           Collection<MessageQueue> totalQueues = consumer.fetchMessageQueues(this.topic);
           List<MessageQueue> shouldAssignQueues = RocketMQUtils.allocate(totalQueues, taskNumber, taskIndex);
           assignQueues = shouldAssignQueues;
           return shouldAssignQueues;
       }
   
       // 从offsetTable中移除此consumer未消费的信息
       private void removeUnAssignQueues() throws MQClientException {
           // offset table 开始从状态恢复,保存的是全部queue的信息,移除多余的
           this.offsetTable.forEach((k, v) -> {
               if (!this.assignQueues.contains(k)) {
                   this.offsetTable.remove(k);
               }
           });
       }
   
       // 根据不同策略,指定不同offset
       private void perQueueSeekToSpecialOffset() throws MQClientException {
           for (MessageQueue mq : this.assignQueues) {
               if (this.offsetTable.containsKey(mq)) {
                   long offset = this.offsetTable.get(mq) + 1;
                   LOG.info("consumer seek {} from state, offset {}", mq, offset);
                   this.consumer.seek(mq, offset);
               } else {
                   // 如果是从状态恢复,但是找不到,那么这个offset 就是新加入的
                   if (this.restored) {
                       this.consumer.seekToBegin(mq);
                   } else {
                       // 不是restored,那么就是直接重启,根据提供的策略选择offset
                       String offsetFrom = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, Offset.LATEST.toString());
                       Offset initialOffsetFrom = Offset.valueOf(offsetFrom);
                       switch (initialOffsetFrom) {
                           case EARLIEST:
                               consumer.seekToBegin(mq);
                               LOG.info("{} seek to begin ......", mq);
                               break;
                           case LATEST:
                               consumer.seekToEnd(mq);
                               LOG.info("{} seek to end ......", mq);
                               break;
                           case STORE:
                               break;
                           case TIMESTAMP:
                               LOG.info("{} seek to timestamp {} ......", mq, System.currentTimeMillis());
                               consumer.seek(mq, System.currentTimeMillis());
                               break;
                           default:
                               break;
                       }
                   }
               }
           }
       }
   
       // 用来保存值
       private class RocketMQCollector implements Collector<OUT> {
           private final Queue<OUT> records = new ArrayDeque<>();
           private boolean endOfStreamSignalled = false;
   
           @Override
           public void collect(OUT record) {
               // do not emit subsequent elements if the end of the stream reached
               if (endOfStreamSignalled || deserializer.isEndOfStream(record)) {
                   endOfStreamSignalled = true;
                   return;
               }
               records.add(record);
           }
   
           public Queue<OUT> getRecords() {
               return records;
           }
   
           public boolean isEndOfStreamSignalled() {
               return endOfStreamSignalled;
           }
   
           public void setEndOfStreamSignalled(boolean endOfStreamSignalled) {
               this.endOfStreamSignalled = endOfStreamSignalled;
           }
   
           @Override
           public void close() {
   
           }
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-flink] SteNicholas commented on issue #19: Direct new DefaultMQPullConsumer, cann't update offset to broker when notifyCheckpointComplete function

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on issue #19:
URL: https://github.com/apache/rocketmq-flink/issues/19#issuecomment-963104645


   @wangyongchun8888 , do you have the example to reproduce the problem you mentioned?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org