You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/03/14 13:14:51 UTC

bahir-flink git commit: [BAHIR-95] Add ZREM to Redis commands

Repository: bahir-flink
Updated Branches:
  refs/heads/master 3f180342c -> 130475839


[BAHIR-95] Add ZREM to Redis commands

This closes #13


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/13047583
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/13047583
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/13047583

Branch: refs/heads/master
Commit: 13047583906338d8b1428f3ae1b8c8dd8a8a1089
Parents: 3f18034
Author: ariskk <ak...@gmail.com>
Authored: Tue Mar 14 10:13:32 2017 +0000
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 14 14:14:31 2017 +0100

----------------------------------------------------------------------
 flink-connector-redis/README.md                    |  5 ++++-
 .../streaming/connectors/redis/RedisSink.java      |  3 +++
 .../common/container/RedisClusterContainer.java    | 12 ++++++++++++
 .../common/container/RedisCommandsContainer.java   |  8 ++++++++
 .../redis/common/container/RedisContainer.java     | 17 +++++++++++++++++
 .../redis/common/mapper/RedisCommand.java          |  5 +++++
 .../connectors/redis/RedisSinkITCase.java          | 15 ++++++++++++---
 7 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md
index 0748a92..87da6d3 100644
--- a/flink-connector-redis/README.md
+++ b/flink-connector-redis/README.md
@@ -141,6 +141,9 @@ This section gives a description of all the available data types and what Redis
         </tr>
         <tr>
             <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td>
-        </tr>                
+        </tr>
+        <tr>
+            <td>SORTED_SET</td><td><a href="http://redis.io/commands/zrem">ZREM</a></td>
+        </tr>
       </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 688f94a..9138862 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -151,6 +151,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
             case ZADD:
                 this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                 break;
+            case ZREM:
+                this.redisCommandsContainer.zrem(this.additionalKey, key);
+                break;
             case HSET:
                 this.redisCommandsContainer.hset(this.additionalKey, key, value);
                 break;

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index cc1d626..ba733f7 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -160,6 +160,18 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
         }
     }
 
+    @Override
+    public void zrem(final String key, final String element) {
+        try {
+            jedisCluster.zrem(key, element);
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.error("Cannot send Redis message with command ZREM to set {} error message {}",
+                    key, e.getMessage());
+            }
+        }
+    }
+
     /**
      * Closes the {@link JedisCluster}.
      */

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 78771f1..5d7993c 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -107,6 +107,14 @@ public interface RedisCommandsContainer extends Serializable {
     void zadd(String key, String score, String element);
 
     /**
+     * Removes the specified member from the sorted set stored at key.
+     *
+     * @param key The name of the Sorted Set
+     * @param element  element to be removed
+     */
+    void zrem(String key, String element);
+
+    /**
      * Close the Jedis container.
      *
      * @throws IOException if the instance can not be closed properly

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index fb73a27..b862ea4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -221,6 +221,23 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
         }
     }
 
+    @Override
+    public void zrem(final String key, final String element) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.zrem(key, element);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command ZREM to set {} error message {}",
+                        key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
     /**
      * Returns Jedis instance from the pool.
      *

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index cf9842c..019ad46 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -61,6 +61,11 @@ public enum RedisCommand {
     ZADD(RedisDataType.SORTED_SET),
 
     /**
+     * Removes the specified members from the sorted set stored at key.
+     */
+    ZREM(RedisDataType.SORTED_SET),
+
+    /**
      * Sets field in the hash stored at key to value. If key does not exist,
      * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
      */

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index e071894..47544f7 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -35,6 +35,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
 
     private FlinkJedisPoolConfig jedisPoolConfig;
     private static final Long NUM_ELEMENTS = 20L;
+    private static final Long ZERO = 0L;
     private static final String REDIS_KEY = "TEST_KEY";
     private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
 
@@ -97,14 +98,22 @@ public class RedisSinkITCase extends RedisITCaseBase {
     @Test
     public void testRedisSortedSetDataType() throws Exception {
         DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
-        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+        RedisSink<Tuple2<String, String>> redisZaddSink = new RedisSink<>(jedisPoolConfig,
             new RedisAdditionalDataMapper(RedisCommand.ZADD));
 
-        source.addSink(redisSink);
-        env.execute("Test Redis Sorted Set Data Type");
+        source.addSink(redisZaddSink);
+        env.execute("Test ZADD");
 
         assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
 
+        RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
+                new RedisAdditionalDataMapper(RedisCommand.ZREM));
+
+        source.addSink(redisZremSink);
+        env.execute("Test ZREM");
+
+        assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
         jedis.del(REDIS_ADDITIONAL_KEY);
     }