You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/13 11:07:26 UTC
[GitHub] [rocketmq-flink] godfather1103 commented on issue #81: Consumption duplication
godfather1103 commented on issue #81:
URL: https://github.com/apache/rocketmq-flink/issues/81#issuecomment-1348202411
package demo.rocketmq;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.config.OffsetResetStrategy;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleKeyValueDeserializationSchema;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class RocketMQFlinkExample {
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setStateBackend(new MemoryStateBackend());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start a checkpoint every 10s
env.enableCheckpointing(10000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// enable checkpoint
// env.enableCheckpointing(3000);
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "127.0.0.1:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink-source-group");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "flink-source");
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "127.0.0.1:9876");
final SimpleKeyValueDeserializationSchema schema = new SimpleKeyValueDeserializationSchema("key", "body");
RocketMQSourceFunction<Map<Object, Object>> source = new RocketMQSourceFunction(
schema,
consumerProps
);
// use group offsets.
// If there is no committed offset,consumer would start from the latest offset.
source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
env.addSource(source)
.name("rocketmq-source")
.rebalance()
.process(new ProcessFunction<Map<Object, Object>, Map<Object, Object>>() {
@Override
public void processElement(
Map<Object, Object> in,
Context ctx,
Collector<Map<Object, Object>> out) {
HashMap result = new HashMap(2);
result.put("time", Long.toString(System.currentTimeMillis()));
result.put("queueId", in.get("queueId"));
result.put("key", "这里是相关Key=" + in.get("key"));
result.put("body", "这里是相关body=" + in.get("body"));
out.collect(result);
}
})
.name("upper-processor")
.rebalance()
.process(new ProcessFunction<Map<Object, Object>, String>() {
@Override
public void processElement(Map<Object, Object> value, Context ctx, Collector<String> out) throws Exception {
String jsonString = JSONObject.toJSONString(value);
out.collect(jsonString);
}
})
.rebalance()
.print()
// .addSink(new RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true))
.name("rocketmq-sink");
try {
env.execute("rocketmq-flink-example");
} catch (Exception e) {
e.printStackTrace();
}
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org