You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/13 11:07:03 UTC

[GitHub] [rocketmq-flink] godfather1103 opened a new issue, #81: Consumption duplication

godfather1103 opened a new issue, #81:
URL: https://github.com/apache/rocketmq-flink/issues/81

   Data duplication during consumption in the demo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-flink] godfather1103 commented on issue #81: Consumption duplication

Posted by GitBox <gi...@apache.org>.
godfather1103 commented on issue #81:
URL: https://github.com/apache/rocketmq-flink/issues/81#issuecomment-1348213919

   <img width="1135" alt="iShot_2022-12-13_19 11 12" src="https://user-images.githubusercontent.com/11797964/207302640-6ececdd3-041a-4bb1-b990-6f655dd01d42.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-flink] godfather1103 commented on issue #81: Consumption duplication

Posted by GitBox <gi...@apache.org>.
godfather1103 commented on issue #81:
URL: https://github.com/apache/rocketmq-flink/issues/81#issuecomment-1350573670

   This works


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-flink] godfather1103 commented on issue #81: Consumption duplication

Posted by GitBox <gi...@apache.org>.
godfather1103 commented on issue #81:
URL: https://github.com/apache/rocketmq-flink/issues/81#issuecomment-1348202411

   package demo.rocketmq;
   
   import com.alibaba.fastjson.JSONObject;
   import org.apache.flink.api.java.utils.ParameterTool;
   import org.apache.flink.runtime.state.memory.MemoryStateBackend;
   import org.apache.flink.streaming.api.CheckpointingMode;
   import org.apache.flink.streaming.api.TimeCharacteristic;
   import org.apache.flink.streaming.api.environment.CheckpointConfig;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.ProcessFunction;
   import org.apache.flink.util.Collector;
   import org.apache.rocketmq.client.log.ClientLogger;
   import org.apache.rocketmq.flink.legacy.RocketMQConfig;
   import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
   import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
   import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueDeserializationSchema;
   
   import java.util.HashMap;
   import java.util.Map;
   import java.util.Properties;
   
   public class RocketMQFlinkExample {
       public static void main(String[] args) {
   
           final ParameterTool params = ParameterTool.fromArgs(args);
   
           System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           env.getConfig().setGlobalJobParameters(params);
           env.setStateBackend(new MemoryStateBackend());
           env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
   
           // start a checkpoint every 10s
           env.enableCheckpointing(10000);
           // advanced options:
           // set mode to exactly-once (this is the default)
           env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
           // checkpoints have to complete within one minute, or are discarded
           env.getCheckpointConfig().setCheckpointTimeout(60000);
           // make sure 500 ms of progress happen between checkpoints
           env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
           // allow only one checkpoint to be in progress at the same time
           env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
           // enable externalized checkpoints which are retained after job cancellation
           env.getCheckpointConfig()
                   .enableExternalizedCheckpoints(
                           CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
   
           // enable checkpoint
   //        env.enableCheckpointing(3000);
   
           Properties consumerProps = new Properties();
           consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "127.0.0.1:9876");
           consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink-source-group");
           consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "flink-source");
   
           Properties producerProps = new Properties();
           producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "127.0.0.1:9876");
           final SimpleKeyValueDeserializationSchema schema = new SimpleKeyValueDeserializationSchema("key", "body");
           RocketMQSourceFunction<Map<Object, Object>> source = new RocketMQSourceFunction(
                   schema,
                   consumerProps
           );
           // use group offsets.
           // If there is no committed offset,consumer would start from the latest offset.
           source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
           env.addSource(source)
                   .name("rocketmq-source")
                   .rebalance()
                   .process(new ProcessFunction<Map<Object, Object>, Map<Object, Object>>() {
                       @Override
                       public void processElement(
                               Map<Object, Object> in,
                               Context ctx,
                               Collector<Map<Object, Object>> out) {
                           HashMap result = new HashMap(2);
                           result.put("time", Long.toString(System.currentTimeMillis()));
                           result.put("queueId", in.get("queueId"));
                           result.put("key", "这里是相关Key=" + in.get("key"));
                           result.put("body", "这里是相关body=" + in.get("body"));
                           out.collect(result);
                       }
                   })
                   .name("upper-processor")
                   .rebalance()
                   .process(new ProcessFunction<Map<Object, Object>, String>() {
                       @Override
                       public void processElement(Map<Object, Object> value, Context ctx, Collector<String> out) throws Exception {
                           String jsonString = JSONObject.toJSONString(value);
                           out.collect(jsonString);
                       }
                   })
                   .rebalance()
                   .print()
   //                .addSink(new RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true))
                   .name("rocketmq-sink");
   
           try {
               env.execute("rocketmq-flink-example");
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-flink] godfather1103 commented on issue #81: Consumption duplication

Posted by GitBox <gi...@apache.org>.
godfather1103 commented on issue #81:
URL: https://github.com/apache/rocketmq-flink/issues/81#issuecomment-1348210472

   [RocketMQFlinkExample.txt](https://github.com/apache/rocketmq-flink/files/10217229/RocketMQFlinkExample.txt)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org