You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 11:17:58 UTC

[inlong] branch master updated: [INLONG-5722][Agent] Support Redis Source (#5780)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd57b119 [INLONG-5722][Agent] Support Redis Source (#5780)
dfd57b119 is described below

commit dfd57b1192832857b696c1e94a7b6b147b41051c
Author: iamsee123 <61...@users.noreply.github.com>
AuthorDate: Mon Sep 19 19:17:53 2022 +0800

    [INLONG-5722][Agent] Support Redis Source (#5780)
---
 inlong-agent/agent-plugins/pom.xml                 |   4 +
 .../inlong/agent/plugin/sources/RedisSource.java   |  49 +++
 .../agent/plugin/sources/reader/RedisReader.java   | 447 +++++++++++++++++++++
 .../agent/plugin/sources/RedisReaderTest.java      |  57 +++
 licenses/inlong-agent/LICENSE                      |   1 +
 pom.xml                                            |   6 +
 6 files changed, 564 insertions(+)

diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml
index c00e5351f..5cb9405a4 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -98,6 +98,10 @@
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-postgres</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.moilioncircle</groupId>
+            <artifactId>redis-replicator</artifactId>
+        </dependency>
         <dependency>
             <artifactId>awaitility</artifactId>
             <groupId>org.awaitility</groupId>
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
new file mode 100644
index 000000000..48003c8de
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Reader;
+import org.apache.inlong.agent.plugin.sources.reader.RedisReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Redis source
+ */
+public class RedisSource extends AbstractSource {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class);
+
+    public RedisSource() {
+
+    }
+
+    @Override
+    public List<Reader> split(JobProfile conf) {
+        super.init(conf);
+        Reader redisReader = new RedisReader();
+        List<Reader> readerList = new ArrayList<>();
+        readerList.add(redisReader);
+        sourceMetric.sourceSuccessCount.incrementAndGet();
+        return readerList;
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
new file mode 100644
index 000000000..a59f7eaf5
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.Event;
+import com.moilioncircle.redis.replicator.event.EventListener;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Redis data reader
+ */
+public class RedisReader extends AbstractReader {
+
+    public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric";
+    public static final String JOB_REDIS_PORT = "job.redisJob.port";
+    public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname";
+    public static final String JOB_REDIS_SSL = "job.redisJob.ssl";
+    public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser";
+    public static final String JOB_REDIS_AUTHPASSWORD = "job.redisJob.authPassword";
+    public static final String JOB_REDIS_READTIMEOUT = "job.redisJob.readTimeout";
+    public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize";
+    public static final String JOB_REDIS_REPLID = "job.redisJob.replId";
+    public static final String JOB_REDIS_OFFSET = "job.redisJob.offset";
+    private static final Logger LOGGER = LoggerFactory.getLogger(RedisReader.class);
+    private String port;
+    private String hostName;
+    private boolean ssl;
+    private String authUser;
+    private String authPassword;
+    private String readTimeout;
+    private String instanceId;
+    private String replId;
+    private String snapShot;
+    private boolean destroyed;
+    private Replicator redisReplicator;
+    private LinkedBlockingQueue<String> redisMessageQueue;
+    private boolean finished = false;
+    private ExecutorService executor;
+    private Gson gson;
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr());
+        port = jobConf.get(JOB_REDIS_PORT);
+        hostName = jobConf.get(JOB_REDIS_HOSTNAME);
+        ssl = jobConf.getBoolean(JOB_REDIS_SSL, false);
+        authUser = jobConf.get(JOB_REDIS_AUTHUSER, "");
+        authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, "");
+        readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, "");
+        replId = jobConf.get(JOB_REDIS_REPLID, "");
+        snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1");
+        instanceId = jobConf.getInstanceId();
+        finished = false;
+        redisMessageQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_REDIS_QUEUE_SIZE, 10000));
+        initGson();
+        String uri = getRedisUri();
+        try {
+            redisReplicator = new RedisReplicator(uri);
+            initReplicator();
+            redisReplicator.addEventListener(new EventListener() {
+                @Override
+                public void onEvent(Replicator replicator, Event event) {
+                    try {
+                        if (event instanceof DefaultCommand || event instanceof KeyValuePair<?, ?>) {
+                            redisMessageQueue.put(gson.toJson(event));
+                            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId,
+                                    System.currentTimeMillis(), 1);
+                            readerMetric.pluginReadCount.incrementAndGet();
+                        }
+                        if (event instanceof PostRdbSyncEvent) {
+                            snapShot = String.valueOf(replicator.getConfiguration().getReplOffset());
+                            LOGGER.info("after rdb snapShot is: {}", snapShot);
+                        }
+                    } catch (InterruptedException e) {
+                        readerMetric.pluginReadFailCount.incrementAndGet();
+                        LOGGER.error("Read redis data error", e);
+                    }
+                }
+            });
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(new Thread(() -> {
+                try {
+                    redisReplicator.open();
+                } catch (IOException e) {
+                    LOGGER.error("Redis source error", e);
+                }
+            }));
+        } catch (URISyntaxException | IOException e) {
+            readerMetric.pluginReadFailCount.addAndGet(1);
+            LOGGER.error("Connect to redis {}:{} failed.", hostName, port);
+        }
+    }
+
+    private String getRedisUri() {
+        StringBuffer sb = new StringBuffer("redis://");
+        sb.append(hostName).append(":").append(port);
+        sb.append("?");
+        if (!StringUtils.isEmpty(authPassword)) {
+            sb.append("authPassword=").append(authPassword).append("&");
+        }
+        if (!StringUtils.isEmpty(authUser)) {
+            sb.append("authUser=").append(authUser).append("&");
+        }
+        if (!StringUtils.isEmpty(readTimeout)) {
+            sb.append("readTimeout=").append(readTimeout).append("&");
+        }
+        if (ssl) {
+            sb.append("ssl=").append("yes").append("&");
+        }
+        if (!StringUtils.isEmpty(snapShot)) {
+            sb.append("replOffset=").append(snapShot).append("&");
+        }
+        if (!StringUtils.isEmpty(replId)) {
+            sb.append("replId=").append(replId).append("&");
+        }
+        if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public void destroy() {
+        synchronized (this) {
+            if (!destroyed) {
+                try {
+                    executor.shutdown();
+                    redisReplicator.close();
+                } catch (IOException e) {
+                    LOGGER.error("Redis reader close failed.");
+                }
+                destroyed = true;
+            }
+        }
+    }
+
+    @Override
+    public Message read() {
+        if (!redisMessageQueue.isEmpty()) {
+            readerMetric.pluginReadCount.incrementAndGet();
+            return new DefaultMessage(redisMessageQueue.poll().getBytes());
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public String getReadSource() {
+        return instanceId;
+    }
+
+    @Override
+    public void setReadTimeout(long mill) {
+
+    }
+
+    @Override
+    public void setWaitMillisecond(long millis) {
+
+    }
+
+    @Override
+    public String getSnapshot() {
+        return snapShot;
+    }
+
+    @Override
+    public void finishRead() {
+        finished = true;
+    }
+
+    @Override
+    public boolean isSourceExist() {
+        return true;
+    }
+
+    /**
+     * init GSON parser
+     */
+    private void initGson() {
+        gson = new GsonBuilder().registerTypeAdapter(KeyStringValueHash.class, new TypeAdapter<KeyStringValueHash>() {
+
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueHash kv) throws IOException {
+                        out.beginObject();
+                        out.name("DB").beginObject();
+                        out.name("dbNumber").value(kv.getDb().getDbNumber());
+                        out.name("dbSize").value(kv.getDb().getDbsize());
+                        out.name("expires").value(kv.getDb().getExpires());
+                        out.endObject();
+                        out.name("valueRdbType").value(kv.getValueRdbType());
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").beginObject();
+                        for (byte[] b : kv.getValue().keySet()) {
+                            out.name(new String(b)).value(new String(kv.getValue().get(b)));
+                        }
+                        out.endObject();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueHash read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                }).registerTypeAdapter(DefaultCommand.class, new TypeAdapter<DefaultCommand>() {
+                    @Override
+                    public void write(JsonWriter out, DefaultCommand dc) throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(dc.getCommand()));
+                        out.name("value").beginArray();
+                        for (byte[] bytes : dc.getArgs()) {
+                            out.value(new String(bytes));
+                        }
+                        out.endArray();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public DefaultCommand read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                })
+                .registerTypeAdapter(KeyStringValueList.class, new TypeAdapter<KeyStringValueList>() {
+
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueList kv) throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").beginArray();
+                        for (byte[] bytes : kv.getValue()) {
+                            out.value(new String(bytes));
+                        }
+                        out.endArray();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueList read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                })
+                .registerTypeAdapter(KeyStringValueSet.class, new TypeAdapter<KeyStringValueSet>() {
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueSet kv) throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").beginArray();
+                        for (byte[] bytes : kv.getValue()) {
+                            out.value(new String(bytes));
+                        }
+                        out.endArray();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueSet read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                })
+                .registerTypeAdapter(KeyStringValueString.class, new TypeAdapter<KeyStringValueString>() {
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueString kv) throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").value(new String(kv.getValue()));
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueString read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                })
+                .registerTypeAdapter(KeyStringValueZSet.class, new TypeAdapter<KeyStringValueZSet>() {
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueZSet kv) throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").beginArray();
+                        for (ZSetEntry entry : kv.getValue()) {
+                            out.beginObject();
+                            out.name("element").value(new String(entry.getElement()));
+                            out.name("score").value(entry.getScore());
+                            out.endObject();
+                        }
+                        out.endArray();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueZSet read(JsonReader in) throws IOException {
+                        return null;
+                    }
+                })
+                .create();
+    }
+
+    /**
+     * init replicator's commandParser
+     */
+    private void initReplicator() {
+        DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+        redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETEX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MSET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DEL"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SADD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HMSET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HSET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LSET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXPIRE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GETSET"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HSETNX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MSETNX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PSETEX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETNX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETRANGE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HDEL"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPOP"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPUSH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPUSHX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LRem"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPOP"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPUSH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPUSHX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREM"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RENAME"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("INCR"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DECR"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("INCRBY"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DECRBY"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PERSIST"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SELECT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HINCRBY"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MOVE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SMOVE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFADD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFMERGE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZADD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LINSERT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RENAMENX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RESTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GEOADD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EVAL"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EVALSHA"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SCRIPT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PUBLISH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BITOP"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BITFIELD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETBIT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SREM"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("UNLINK"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SWAPDB"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MULTI"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXEC"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LTRIM"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SORT"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("REPLCONF"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XACK"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XADD"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XCLAIM"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XDEL"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XGROUP"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XTRIM"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XSETID"), defaultCommandParser);
+        // since redis 6.2
+        redisReplicator.addCommandParser(CommandName.name("COPY"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LMOVE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BLMOVE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), defaultCommandParser);
+        // since redis 7.0
+        redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FUNCTION"), defaultCommandParser);
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/RedisReaderTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/RedisReaderTest.java
new file mode 100644
index 000000000..3903488df
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/RedisReaderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.sources.reader.RedisReader;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for redis reader
+ */
+public class RedisReaderTest {
+
+    private static Logger LOGGER = LoggerFactory.getLogger(RedisReaderTest.class);
+
+    /**
+     * this test is used for testing collect data from redis in unit test
+     * just use in local test
+     */
+    @Ignore
+    public void redisLoadTest() {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.set(RedisReader.JOB_REDIS_HOSTNAME, "localhost");
+        jobProfile.set(RedisReader.JOB_REDIS_PORT, "6379");
+        jobProfile.set(RedisReader.JOB_REDIS_OFFSET, "7945");
+        jobProfile.set(RedisReader.JOB_REDIS_REPLID, "33a016fc98ad27f36218c7648a7a0774a79547d8");
+        jobProfile.set("job.instance.id", "_1");
+        RedisReader redisReader = new RedisReader();
+        redisReader.init(jobProfile);
+        while (true) {
+            Message message = redisReader.read();
+            if (message != null) {
+                LOGGER.info("read message is {}", message);
+                break;
+            }
+        }
+    }
+
+}
diff --git a/licenses/inlong-agent/LICENSE b/licenses/inlong-agent/LICENSE
index 04b79a5bd..9738923c3 100644
--- a/licenses/inlong-agent/LICENSE
+++ b/licenses/inlong-agent/LICENSE
@@ -425,6 +425,7 @@ The text of each license is the standard Apache 2.0 license.
   org.codehaus.plexus:plexus-utils:3.2.1 - Plexus Common Utilities (http://codehaus-plexus.github.io/plexus-utils/), (Apache License, Version 2.0)
   org.apache.pulsar:pulsar-client-admin-api:2.8.1 - Pulsar Client Admin :: API (https://github.com/apache/pulsar/tree/v2.8.1), (Apache License, Version 2.0)
   org.apache.pulsar:pulsar-client-api:2.8.1 - Pulsar Client :: API (https://github.com/apache/pulsar/tree/v2.8.1), (Apache License, Version 2.0)
+  com.moilioncircle:redis-replicator:3.6.4 - redis-replicator (https://github.com/leonchen83/redis-replicator), (Apache License, Version 2.0)
   org.reflections:reflections:0.10.2 - Reflections (https://github.com/ronmamo/reflections/tree/0.10.2), (The Apache Software License, Version 2.0;  WTFPL)
   org.rocksdb:rocksdbjni:6.14.6 - RocksDB JNI (https://github.com/facebook/rocksdb/tree/v6.14.6), (Apache License 2.0;  GNU General Public License, version 2)
   org.scala-lang.modules:scala-collection-compat_2.11:2.1.2 - scala-collection-compat (https://github.com/scala/scala-collection-compat/tree/v2.1.2), (Apache-2.0)
diff --git a/pom.xml b/pom.xml
index 943a3c0ea..ec96f45e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
         <h2.mysql.version>2.0.0</h2.mysql.version>
         <debezium.version>1.8.0.Final</debezium.version>
         <rocksdb.version>6.14.6</rocksdb.version>
+        <redis-replicator.version>3.6.4</redis-replicator.version>
         <hadoop.version>2.10.2</hadoop.version>
         <postgresql.version>42.4.1</postgresql.version>
         <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
@@ -625,6 +626,11 @@
                 <artifactId>h2-functions-4-mysql</artifactId>
                 <version>${h2.mysql.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.moilioncircle</groupId>
+                <artifactId>redis-replicator</artifactId>
+                <version>${redis-replicator.version}</version>
+            </dependency>
 
             <!--spring -->
             <dependency>