You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by es...@apache.org on 2023/05/30 16:07:51 UTC

[bahir-flink] branch master updated: [BAHIR-201] Redis: add hdel command

This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a03fa8c  [BAHIR-201] Redis: add hdel command
a03fa8c is described below

commit a03fa8c1c9320ff5b729bf75c2717c0e832fa5f1
Author: Hyeonho Kim <jw...@gmail.com>
AuthorDate: Wed May 31 01:07:45 2023 +0900

    [BAHIR-201] Redis: add hdel command
---
 .../flink/streaming/connectors/redis/RedisSink.java     |  3 +++
 .../redis/common/container/RedisClusterContainer.java   | 13 +++++++++++++
 .../redis/common/container/RedisCommandsContainer.java  |  8 ++++++++
 .../redis/common/container/RedisContainer.java          | 17 +++++++++++++++++
 .../connectors/redis/common/mapper/RedisCommand.java    |  6 ++++++
 .../streaming/connectors/redis/RedisSinkITCase.java     | 12 ++++++++++--
 6 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 57f171d..0101339 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -180,6 +180,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
             case HINCRBY:
                 this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                 break;
+            case HDEL:
+                this.redisCommandsContainer.hdel(optAdditionalKey.orElse(this.additionalKey), key);
+                break;
             case INCRBY:
                 this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
                 break;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index d2aa13e..5c67aae 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -81,6 +81,19 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
         }
     }
 
+    @Override
+    public void hdel(String key, String hashField) {
+        try {
+            jedisCluster.hdel(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HDEL to hash {} of key {} error message {}",
+                    hashField, key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
     @Override
     public void rpush(final String listName, final String value) {
         try {
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 1d43dd5..9fbad93 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -46,6 +46,14 @@ public interface RedisCommandsContainer extends Serializable {
 
     void hincrBy(String key, String hashField, Long value, Integer ttl);
 
+    /**
+     * Removes the specified field from the hash stored at key.
+     * Specified fields that do not exist within this hash are ignored.
+     * @param key
+     * @param hashField
+     */
+    void hdel(String key, String hashField);
+
     /**
      * Insert the specified value at the tail of the list stored at key.
      * If key does not exist, it is created as empty list before performing the push operation.
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 539192b..955fa31 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -125,6 +125,23 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
         }
     }
 
+    @Override
+    public void hdel(String key, String hashField) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.hdel(key, hashField);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HDEL to key {} and hashField {} error message {}",
+                    key, hashField, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
     @Override
     public void rpush(final String listName, final String value) {
         Jedis jedis = null;
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index 0e7d3af..1e48e7f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -79,6 +79,12 @@ public enum RedisCommand {
      */
     HSET(RedisDataType.HASH),
 
+    /**
+     * Removes the specified field from the hash stored at key.
+     * Specified fields that do not exist within this hash are ignored.
+     */
+    HDEL(RedisDataType.HASH),
+
     HINCRBY(RedisDataType.HINCRBY),
 
     /**
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 5858eb1..72fdf09 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -142,15 +142,23 @@ public class RedisSinkITCase extends RedisITCaseBase {
     @Test
     public void testRedisHashDataType() throws Exception {
         DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
-        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+        RedisSink<Tuple2<String, String>> redisHsetSink = new RedisSink<>(jedisPoolConfig,
             new RedisAdditionalDataMapper(RedisCommand.HSET));
 
-        source.addSink(redisSink);
+        source.addSink(redisHsetSink);
         env.execute("Test Redis Hash Data Type");
 
         assertEquals(NUM_ELEMENTS.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
         assertEquals(REDIS_NOT_ASSOCIATED_EXPIRE_FLAG.longValue(), jedis.ttl(REDIS_ADDITIONAL_KEY));
 
+        RedisSink<Tuple2<String, String>> redisHdelSink = new RedisSink<>(jedisPoolConfig,
+            new RedisAdditionalDataMapper(RedisCommand.HDEL));
+
+        source.addSink(redisHdelSink);
+        env.execute("Test Redis Hash Data Type");
+
+        assertEquals(ZERO.longValue(), jedis.hlen(REDIS_ADDITIONAL_KEY));
+
         jedis.del(REDIS_ADDITIONAL_KEY);
     }