You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/03/14 13:14:51 UTC
bahir-flink git commit: [BAHIR-95] Add ZREM to Redis commands
Repository: bahir-flink
Updated Branches:
refs/heads/master 3f180342c -> 130475839
[BAHIR-95] Add ZREM to Redis commands
This closes #13
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/13047583
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/13047583
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/13047583
Branch: refs/heads/master
Commit: 13047583906338d8b1428f3ae1b8c8dd8a8a1089
Parents: 3f18034
Author: ariskk <ak...@gmail.com>
Authored: Tue Mar 14 10:13:32 2017 +0000
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 14 14:14:31 2017 +0100
----------------------------------------------------------------------
flink-connector-redis/README.md | 5 ++++-
.../streaming/connectors/redis/RedisSink.java | 3 +++
.../common/container/RedisClusterContainer.java | 12 ++++++++++++
.../common/container/RedisCommandsContainer.java | 8 ++++++++
.../redis/common/container/RedisContainer.java | 17 +++++++++++++++++
.../redis/common/mapper/RedisCommand.java | 5 +++++
.../connectors/redis/RedisSinkITCase.java | 15 ++++++++++++---
7 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md
index 0748a92..87da6d3 100644
--- a/flink-connector-redis/README.md
+++ b/flink-connector-redis/README.md
@@ -141,6 +141,9 @@ This section gives a description of all the available data types and what Redis
</tr>
<tr>
<td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td>
- </tr>
+ </tr>
+ <tr>
+ <td>SORTED_SET</td><td><a href="http://redis.io/commands/zrem">ZREM</a></td>
+ </tr>
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 688f94a..9138862 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -151,6 +151,9 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
case ZADD:
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
break;
+ case ZREM:
+ this.redisCommandsContainer.zrem(this.additionalKey, key);
+ break;
case HSET:
this.redisCommandsContainer.hset(this.additionalKey, key, value);
break;
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index cc1d626..ba733f7 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -160,6 +160,18 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
}
}
+ @Override
+ public void zrem(final String key, final String element) {
+ try {
+ jedisCluster.zrem(key, element);
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.error("Cannot send Redis message with command ZREM to set {} error message {}",
+ key, e.getMessage());
+ }
+ }
+ }
+
/**
* Closes the {@link JedisCluster}.
*/
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 78771f1..5d7993c 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -107,6 +107,14 @@ public interface RedisCommandsContainer extends Serializable {
void zadd(String key, String score, String element);
/**
+ * Removes the specified member from the sorted set stored at key.
+ *
+ * @param key The name of the Sorted Set
+ * @param element element to be removed
+ */
+ void zrem(String key, String element);
+
+ /**
* Close the Jedis container.
*
* @throws IOException if the instance can not be closed properly
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index fb73a27..b862ea4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -221,6 +221,23 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
}
}
+ @Override
+ public void zrem(final String key, final String element) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.zrem(key, element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command ZREM to set {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
/**
* Returns Jedis instance from the pool.
*
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
index cf9842c..019ad46 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -61,6 +61,11 @@ public enum RedisCommand {
ZADD(RedisDataType.SORTED_SET),
/**
+ * Removes the specified members from the sorted set stored at key.
+ */
+ ZREM(RedisDataType.SORTED_SET),
+
+ /**
* Sets field in the hash stored at key to value. If key does not exist,
* a new key holding a hash is created. If field already exists in the hash, it is overwritten.
*/
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/13047583/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index e071894..47544f7 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -35,6 +35,7 @@ public class RedisSinkITCase extends RedisITCaseBase {
private FlinkJedisPoolConfig jedisPoolConfig;
private static final Long NUM_ELEMENTS = 20L;
+ private static final Long ZERO = 0L;
private static final String REDIS_KEY = "TEST_KEY";
private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
@@ -97,14 +98,22 @@ public class RedisSinkITCase extends RedisITCaseBase {
@Test
public void testRedisSortedSetDataType() throws Exception {
DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+ RedisSink<Tuple2<String, String>> redisZaddSink = new RedisSink<>(jedisPoolConfig,
new RedisAdditionalDataMapper(RedisCommand.ZADD));
- source.addSink(redisSink);
- env.execute("Test Redis Sorted Set Data Type");
+ source.addSink(redisZaddSink);
+ env.execute("Test ZADD");
assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+ RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
+ new RedisAdditionalDataMapper(RedisCommand.ZREM));
+
+ source.addSink(redisZremSink);
+ env.execute("Test ZREM");
+
+ assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
jedis.del(REDIS_ADDITIONAL_KEY);
}