You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2023/05/30 16:07:51 UTC
[bahir-flink] branch master updated: [BAHIR-201] Redis: add hdel command
This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new a03fa8c [BAHIR-201] Redis: add hdel command
a03fa8c is described below
commit a03fa8c1c9320ff5b729bf75c2717c0e832fa5f1
Author: Hyeonho Kim <jw...@gmail.com>
AuthorDate: Wed May 31 01:07:45 2023 +0900
[BAHIR-201] Redis: add hdel command
---
.../flink/streaming/connectors/redis/RedisSink.java | 3 +++
.../redis/common/container/RedisClusterContainer.java | 13 +++++++++++++
.../redis/common/container/RedisCommandsContainer.java | 8 ++++++++
.../redis/common/container/RedisContainer.java | 17 +++++++++++++++++
.../connectors/redis/common/mapper/RedisCommand.java | 6 ++++++
.../streaming/connectors/redis/RedisSinkITCase.java | 12 ++++++++++--
6 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 57f171d..0101339 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -180,6 +180,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
case HINCRBY:
this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
break;
+ case HDEL:
+ this.redisCommandsContainer.hdel(optAdditionalKey.orElse(this.additionalKey), key);
+ break;
case INCRBY:
this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index d2aa13e..5c67aae 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -81,6 +81,19 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
}
}
+ @Override
+ public void hdel(String key, String hashField) {
+ try {
+ jedisCluster.hdel(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HDEL to hash {} of key {} error message {}",
+ hashField, key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
@Override
public void rpush(final String listName, final String value) {
try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 1d43dd5..9fbad93 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -46,6 +46,14 @@ public interface RedisCommandsContainer extends Serializable {
void hincrBy(String key, String hashField, Long value, Integer ttl);
+ /**
+ * Removes the specified field from the hash stored at key.
+ * Specified fields that do not exist within this hash are ignored.
+ * @param key
+ * @param hashField
+ */
+ void hdel(String key, String hashField);
+
/**
* Insert the specified value at the tail of the list stored at key.
* If key does not exist, it is created as empty list before performing the push operation.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 539192b..955fa31 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -125,6 +125,23 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
}
}
+ @Override
+ public void hdel(String key, String hashField) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.hdel(key, hashField);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HDEL to key {} and hashField {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
@Override
public void rpush(final String listName, final String value) {
Jedis jedis = null;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 0e7d3af..1e48e7f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -79,6 +79,12 @@ public enum RedisCommand {
*/
HSET(RedisDataType.HASH),
+ /**
+ * Removes the specified field from the hash stored at key.
+ * Specified fields that do not exist within this hash are ignored.
+ */
+ HDEL(RedisDataType.HASH),
+
HINCRBY(RedisDataType.HINCRBY),
/**
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 5858eb1..72fdf09 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -142,15 +142,23 @@ public class RedisSinkITCase extends RedisITCaseBase {
@Test
public void testRedisHashDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ RedisSink<Tuple2<String, String>> redisHsetSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.HSET));
- source.addSink(redisSink);
+ source.addSink(redisHsetSink);
env.execute("Test Redis Hash Data Type");
assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
+ RedisSink<Tuple2<String, String>> redisHdelSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalDataMapper(RedisCommand.HDEL));
+
+ source.addSink(redisHdelSink);
+ env.execute("Test Redis Hash Data Type");
+
+ assertEquals(ZERO.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+
jedis.del(REDIS_ADDITIONAL_KEY);
}