You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:23:47 UTC

[rocketmq-connect] 03/08: Support redis 6 (#510)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 1290dde8f7410e63b9bcf562bd6ccf76fb982fec
Author: Baoyi Chen <ch...@qq.com>
AuthorDate: Wed Feb 12 14:39:07 2020 +0800

    Support redis 6 (#510)
---
 pom.xml                                            |  2 +-
 .../rocketmq/connect/redis/common/Options.java     |  1 +
 .../connect/redis/common/RedisConstants.java       |  1 +
 .../rocketmq/connect/redis/parser/SetParser.java   | 47 +++++++++-------------
 4 files changed, 21 insertions(+), 30 deletions(-)

diff --git a/pom.xml b/pom.xml
index c6a2586..088e2de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
     <artifactId>rocketmq-connect-redis</artifactId>
 
     <properties>
-        <redis.replicator.version>3.3.0</redis.replicator.version>
+        <redis.replicator.version>3.4.0</redis.replicator.version>
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java b/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
index 0a02bd4..71cc07f 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
@@ -70,6 +70,7 @@ public class Options<T>{
     public static final Options<Integer> REDIS_RETRYCOUNT = newOption("RETRYCOUNT");
     public static final Options<Boolean> REDIS_FORCE = newOption("FORCE");
     public static final Options<Boolean> REDIS_JUSTID = newOption("JUSTID");
+    public static final Options<Boolean> REDIS_KEEPTTL = newOption("KEEPTTL");
 
 
     private final String name;
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java b/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
index a6d1c0d..fd01c13 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
@@ -29,6 +29,7 @@ public class RedisConstants {
     public static final String INCR = "INCR";
     public static final String IDLE = "IDLE";
     public static final String TIME = "TIME";
+    public static final String KEEPTTL = "KEEPTTL";
     public static final String RETRYCOUNT = "RETRYCOUNT";
     public static final String FORCE = "FORCE";
     public static final String JUSTID = "JUSTID";
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java b/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
index b21fa1d..4165cc2 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
@@ -17,12 +17,16 @@
 
 package org.apache.rocketmq.connect.redis.parser;
 
-import io.openmessaging.connector.api.data.FieldType;
+import static com.moilioncircle.redis.replicator.cmd.CommandParsers.toRune;
+import static com.moilioncircle.redis.replicator.util.Strings.isEquals;
+
+import org.apache.rocketmq.connect.redis.common.Options;
 import org.apache.rocketmq.connect.redis.common.RedisConstants;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
-import org.apache.rocketmq.connect.redis.common.Options;
 import org.apache.rocketmq.connect.redis.pojo.RedisEntry;
 
+import io.openmessaging.connector.api.data.FieldType;
+
 /**
  * set key value [expiration EX seconds|PX milliseconds] [NX|XX]
  */
@@ -36,33 +40,18 @@ public class SetParser extends AbstractCommandParser {
     public KVEntry handleValue(KVEntry builder, String[] args) {
         builder.value(args[0]);
         int idx = 1;
-        while (args.length > idx){
-            String expiration = args[idx];
-            switch (expiration) {
-                case RedisConstants
-                    .EX:
-                    if (args.length > 2) {
-                        builder.param(Options.REDIS_EX, Integer.parseInt(args[2]));
-                        idx += 2;
-                    }
-                    break;
-                case RedisConstants
-                    .PX:
-                    if (args.length > 2) {
-                        builder.param(Options.REDIS_PX, Long.parseLong(args[2]));
-                        idx += 2;
-                    }
-                    break;
-                case RedisConstants
-                    .NX:
-                    builder.param(Options.REDIS_NX, Boolean.TRUE);
-                    idx += 2;
-                    break;
-                case RedisConstants
-                    .XX:
-                    builder.param(Options.REDIS_XX, Boolean.TRUE);
-                    idx += 2;
-                    break;
+        while (idx < args.length){
+            String param = toRune(args[idx++]);
+            if (isEquals(param, RedisConstants.NX)) {
+                builder.param(Options.REDIS_NX, Boolean.TRUE);
+            } else if (isEquals(param, RedisConstants.XX)) {
+                builder.param(Options.REDIS_XX, Boolean.TRUE);
+            } else if (isEquals(param, RedisConstants.KEEPTTL)) {
+                builder.param(Options.REDIS_KEEPTTL, Boolean.TRUE);
+            } else if (isEquals(param, RedisConstants.EX)) {
+                builder.param(Options.REDIS_EX, Integer.parseInt(args[idx++]));
+            } else if (isEquals(param, RedisConstants.PX)) {
+                builder.param(Options.REDIS_PX, Long.parseLong(args[idx++]));
             }
         }
         return builder;