You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/07/03 22:11:19 UTC

[bahir-flink] branch master updated: [BAHIR-85] Enable changing additional key without restarting (#60)

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f92704  [BAHIR-85] Enable changing additional key without restarting (#60)
4f92704 is described below

commit 4f92704328f5e4be4a6493193c132461cf9b2cc3
Author: Ton van Bart <to...@gmail.com>
AuthorDate: Thu Jul 4 00:11:15 2019 +0200

    [BAHIR-85] Enable changing additional key without restarting (#60)
    
    We have a use case where we want to sink data to Redis as hashes but have the hashes stored under different keys which are also extracted from the data. This change makes this possible in a backward-compatible manner.
---
 .../apache/flink/streaming/connectors/redis/RedisSink.java   |  8 +++++---
 .../connectors/redis/common/mapper/RedisMapper.java          | 12 ++++++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 6a03f11..e468772 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * A sink that delivers data to a Redis channel using the Jedis client.
@@ -128,6 +129,7 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
     public void invoke(IN input, Context context) throws Exception {
         String key = redisSinkMapper.getKeyFromData(input);
         String value = redisSinkMapper.getValueFromData(input);
+        Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
 
         switch (redisCommand) {
             case RPUSH:
@@ -149,13 +151,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
                 this.redisCommandsContainer.publish(key, value);
                 break;
             case ZADD:
-                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+                this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), value, key);
                 break;
             case ZREM:
-                this.redisCommandsContainer.zrem(this.additionalKey, key);
+                this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
                 break;
             case HSET:
-                this.redisCommandsContainer.hset(this.additionalKey, key, value);
+                this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value);
                 break;
             default:
                 throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
index b2580a7..96df75e 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.redis.common.mapper;
 import org.apache.flink.api.common.functions.Function;
 
 import java.io.Serializable;
+import java.util.Optional;
 
 /**
  * Function that creates the description how the input data should be mapped to redis type.
@@ -63,4 +64,15 @@ public interface RedisMapper<T> extends Function, Serializable {
      * @return value
      */
     String getValueFromData(T data);
+
+    /**
+     * Extracts the additional key from data as an {@link Optional<String>}.
+     * The default implementation returns an empty Optional.
+     *
+     * @param data
+     * @return Optional
+     */
+    default Optional<String> getAdditionalKey(T data) {
+        return Optional.empty();
+    }
 }