You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:23:47 UTC
[rocketmq-connect] 03/08: Support redis 6 (#510)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 1290dde8f7410e63b9bcf562bd6ccf76fb982fec
Author: Baoyi Chen <ch...@qq.com>
AuthorDate: Wed Feb 12 14:39:07 2020 +0800
Support redis 6 (#510)
---
pom.xml | 2 +-
.../rocketmq/connect/redis/common/Options.java | 1 +
.../connect/redis/common/RedisConstants.java | 1 +
.../rocketmq/connect/redis/parser/SetParser.java | 47 +++++++++-------------
4 files changed, 21 insertions(+), 30 deletions(-)
diff --git a/pom.xml b/pom.xml
index c6a2586..088e2de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
<artifactId>rocketmq-connect-redis</artifactId>
<properties>
- <redis.replicator.version>3.3.0</redis.replicator.version>
+ <redis.replicator.version>3.4.0</redis.replicator.version>
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java b/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
index 0a02bd4..71cc07f 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/common/Options.java
@@ -70,6 +70,7 @@ public class Options<T>{
public static final Options<Integer> REDIS_RETRYCOUNT = newOption("RETRYCOUNT");
public static final Options<Boolean> REDIS_FORCE = newOption("FORCE");
public static final Options<Boolean> REDIS_JUSTID = newOption("JUSTID");
+ public static final Options<Boolean> REDIS_KEEPTTL = newOption("KEEPTTL");
private final String name;
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java b/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
index a6d1c0d..fd01c13 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/common/RedisConstants.java
@@ -29,6 +29,7 @@ public class RedisConstants {
public static final String INCR = "INCR";
public static final String IDLE = "IDLE";
public static final String TIME = "TIME";
+ public static final String KEEPTTL = "KEEPTTL";
public static final String RETRYCOUNT = "RETRYCOUNT";
public static final String FORCE = "FORCE";
public static final String JUSTID = "JUSTID";
diff --git a/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java b/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
index b21fa1d..4165cc2 100644
--- a/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
+++ b/src/main/java/org/apache/rocketmq/connect/redis/parser/SetParser.java
@@ -17,12 +17,16 @@
package org.apache.rocketmq.connect.redis.parser;
-import io.openmessaging.connector.api.data.FieldType;
+import static com.moilioncircle.redis.replicator.cmd.CommandParsers.toRune;
+import static com.moilioncircle.redis.replicator.util.Strings.isEquals;
+
+import org.apache.rocketmq.connect.redis.common.Options;
import org.apache.rocketmq.connect.redis.common.RedisConstants;
import org.apache.rocketmq.connect.redis.pojo.KVEntry;
-import org.apache.rocketmq.connect.redis.common.Options;
import org.apache.rocketmq.connect.redis.pojo.RedisEntry;
+import io.openmessaging.connector.api.data.FieldType;
+
/**
* set key value [expiration EX seconds|PX milliseconds] [NX|XX]
*/
@@ -36,33 +40,18 @@ public class SetParser extends AbstractCommandParser {
public KVEntry handleValue(KVEntry builder, String[] args) {
builder.value(args[0]);
int idx = 1;
- while (args.length > idx){
- String expiration = args[idx];
- switch (expiration) {
- case RedisConstants
- .EX:
- if (args.length > 2) {
- builder.param(Options.REDIS_EX, Integer.parseInt(args[2]));
- idx += 2;
- }
- break;
- case RedisConstants
- .PX:
- if (args.length > 2) {
- builder.param(Options.REDIS_PX, Long.parseLong(args[2]));
- idx += 2;
- }
- break;
- case RedisConstants
- .NX:
- builder.param(Options.REDIS_NX, Boolean.TRUE);
- idx += 2;
- break;
- case RedisConstants
- .XX:
- builder.param(Options.REDIS_XX, Boolean.TRUE);
- idx += 2;
- break;
+ while (idx < args.length){
+ String param = toRune(args[idx++]);
+ if (isEquals(param, RedisConstants.NX)) {
+ builder.param(Options.REDIS_NX, Boolean.TRUE);
+ } else if (isEquals(param, RedisConstants.XX)) {
+ builder.param(Options.REDIS_XX, Boolean.TRUE);
+ } else if (isEquals(param, RedisConstants.KEEPTTL)) {
+ builder.param(Options.REDIS_KEEPTTL, Boolean.TRUE);
+ } else if (isEquals(param, RedisConstants.EX)) {
+ builder.param(Options.REDIS_EX, Integer.parseInt(args[idx++]));
+ } else if (isEquals(param, RedisConstants.PX)) {
+ builder.param(Options.REDIS_PX, Long.parseLong(args[idx++]));
}
}
return builder;