You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/20 02:11:01 UTC
[5/5] bahir-flink git commit: [BAHIR-55] Add Redis connector from
Flink
[BAHIR-55] Add Redis connector from Flink
Closes #1
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/b2955a74
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/b2955a74
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/b2955a74
Branch: refs/heads/master
Commit: b2955a749e39cab55612917e4d5e702781f1e87c
Parents: 9966a0c
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 19 10:55:30 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Aug 19 19:09:27 2016 -0700
----------------------------------------------------------------------
dev/checkstyle.xml | 11 +-
flink-connector-redis/pom.xml | 78 ++++++
.../streaming/connectors/redis/RedisSink.java | 188 ++++++++++++++
.../streaming/connectors/redis/common/Util.java | 25 ++
.../common/config/FlinkJedisClusterConfig.java | 188 ++++++++++++++
.../common/config/FlinkJedisConfigBase.java | 91 +++++++
.../common/config/FlinkJedisPoolConfig.java | 225 ++++++++++++++++
.../common/config/FlinkJedisSentinelConfig.java | 260 +++++++++++++++++++
.../common/container/RedisClusterContainer.java | 171 ++++++++++++
.../container/RedisCommandsContainer.java | 115 ++++++++
.../RedisCommandsContainerBuilder.java | 117 +++++++++
.../redis/common/container/RedisContainer.java | 252 ++++++++++++++++++
.../redis/common/mapper/RedisCommand.java | 86 ++++++
.../common/mapper/RedisCommandDescription.java | 93 +++++++
.../redis/common/mapper/RedisDataType.java | 66 +++++
.../redis/common/mapper/RedisMapper.java | 66 +++++
.../connectors/redis/RedisITCaseBase.java | 45 ++++
.../redis/RedisSentinelClusterTest.java | 99 +++++++
.../connectors/redis/RedisSinkITCase.java | 233 +++++++++++++++++
.../redis/RedisSinkPublishITCase.java | 137 ++++++++++
.../connectors/redis/RedisSinkTest.java | 143 ++++++++++
.../common/config/FlinkJedisConfigBaseTest.java | 50 ++++
.../common/config/JedisClusterConfigTest.java | 49 ++++
.../common/config/JedisPoolConfigTest.java | 29 +++
.../common/config/JedisSentinelConfigTest.java | 49 ++++
.../mapper/RedisDataTypeDescriptionTest.java | 41 +++
.../src/test/resources/log4j.properties | 27 ++
pom.xml | 11 -
28 files changed, 2928 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/dev/checkstyle.xml
----------------------------------------------------------------------
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index 3de6aa9..7a0558c 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -41,7 +41,7 @@
-->
-<module name = "Checker">
+<module name="Checker">
<property name="charset" value="UTF-8"/>
<property name="severity" value="error"/>
@@ -78,10 +78,6 @@
<property name="allowByTailComment" value="true"/>
<property name="allowNonPrintableEscapes" value="true"/>
</module>
- <module name="LineLength">
- <property name="max" value="100"/>
- <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
- </module>
<module name="NoLineWrap"/>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
@@ -165,7 +161,10 @@
<property name="exceptionVariableName" value="expected"/>
</module>
<module name="CommentsIndentation"/>
- <module name="UnusedImports"/>
+ <module name="UnusedImports">
+ <!-- Allow imports for JavaDocs -->
+ <property name="processJavadoc" value="true"/>
+ </module>
<module name="RedundantImport"/>
<module name="RedundantModifier"/>
</module>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
new file mode 100644
index 0000000..c34711e
--- /dev/null
+++ b/flink-connector-redis/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink_parent_2.11</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-redis_2.11</artifactId>
+ <name>flink-connector-redis</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <jedis.version>2.8.0</jedis.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>${jedis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.11</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
new file mode 100644
index 0000000..688f94a
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
+ * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
+ * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
+ * a Redis Cluster.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *{@code
+ *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisExampleMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
+ *}</pre>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class RedisSink<IN> extends RichSinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
+
+ /**
+ * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+ * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+ * {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
+ * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+ * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
+ */
+ private String additionalKey;
+ private RedisMapper<IN> redisSinkMapper;
+ private RedisCommand redisCommand;
+
+ private FlinkJedisConfigBase flinkJedisConfigBase;
+ private RedisCommandsContainer redisCommandsContainer;
+
+ /**
+ * Creates a new {@link RedisSink} that connects to the Redis server.
+ *
+ * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
+ * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
+ */
+ public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
+ Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
+ Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
+ Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
+
+ this.flinkJedisConfigBase = flinkJedisConfigBase;
+
+ this.redisSinkMapper = redisSinkMapper;
+ RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
+ this.redisCommand = redisCommandDescription.getCommand();
+ this.additionalKey = redisCommandDescription.getAdditionalKey();
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Redis channel.
+ * Depending on the specified Redis data type (see {@link RedisDataType}),
+ * a different Redis command will be applied.
+ * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
+ *
+ * @param input The incoming data
+ */
+ @Override
+ public void invoke(IN input) throws Exception {
+ String key = redisSinkMapper.getKeyFromData(input);
+ String value = redisSinkMapper.getValueFromData(input);
+
+ switch (redisCommand) {
+ case RPUSH:
+ this.redisCommandsContainer.rpush(key, value);
+ break;
+ case LPUSH:
+ this.redisCommandsContainer.lpush(key, value);
+ break;
+ case SADD:
+ this.redisCommandsContainer.sadd(key, value);
+ break;
+ case SET:
+ this.redisCommandsContainer.set(key, value);
+ break;
+ case PFADD:
+ this.redisCommandsContainer.pfadd(key, value);
+ break;
+ case PUBLISH:
+ this.redisCommandsContainer.publish(key, value);
+ break;
+ case ZADD:
+ this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+ break;
+ case HSET:
+ this.redisCommandsContainer.hset(this.additionalKey, key, value);
+ break;
+ default:
+ throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
+ }
+ }
+
+ /**
+ * Initializes the connection to Redis by either cluster or sentinels or single server.
+ *
+ * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ try {
+ this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+ this.redisCommandsContainer.open();
+ } catch (Exception e) {
+ LOG.error("Redis has not been properly initialized: ", e);
+ throw e;
+ }
+ }
+
+ /**
+ * Closes commands container.
+ * @throws IOException if command container is unable to close.
+ */
+ @Override
+ public void close() throws IOException {
+ if (redisCommandsContainer != null) {
+ redisCommandsContainer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
new file mode 100644
index 0000000..b0e38b9
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common;
+
+public class Util {
+ public static void checkArgument(boolean condition, String message) {
+ if(!condition) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
new file mode 100644
index 0000000..119ade3
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+ private static final long serialVersionUID = 1L;
+
+ private final Set<InetSocketAddress> nodes;
+ private final int maxRedirections;
+
+
+ /**
+ * Jedis cluster configuration.
+ * The list of node is mandatory, and when nodes is not set, it throws NullPointerException.
+ *
+ * @param nodes list of node information for JedisCluster
+ * @param connectionTimeout socket / connection timeout. The default is 2000
+ * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
+ * @param maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ * @throws NullPointerException if parameter {@code nodes} is {@code null}
+ */
+ private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+ Objects.requireNonNull(nodes, "Node information should be presented");
+ Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
+ this.nodes = new HashSet<>(nodes);
+ this.maxRedirections = maxRedirections;
+ }
+
+
+
+ /**
+ * Returns nodes.
+ *
+ * @return list of node information
+ */
+ public Set<HostAndPort> getNodes() {
+ Set<HostAndPort> ret = new HashSet<>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
+ }
+
+ /**
+ * Returns limit of redirection.
+ *
+ * @return limit of redirection
+ */
+ public int getMaxRedirections() {
+ return maxRedirections;
+ }
+
+
+ /**
+ * Builder for initializing {@link FlinkJedisClusterConfig}.
+ */
+ public static class Builder {
+ private Set<InetSocketAddress> nodes;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int maxRedirections = 5;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets list of node.
+ *
+ * @param nodes list of node
+ * @return Builder itself
+ */
+ public Builder setNodes(Set<InetSocketAddress> nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ /**
+ * Sets socket / connection timeout.
+ *
+ * @param timeout socket / connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets limit of redirection.
+ *
+ * @param maxRedirections limit of redirection, default value is 5
+ * @return Builder itself
+ */
+ public Builder setMaxRedirections(int maxRedirections) {
+ this.maxRedirections = maxRedirections;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisClusterConfig.
+ *
+ * @return JedisClusterConfig
+ */
+ public FlinkJedisClusterConfig build() {
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisClusterConfig{" +
+ "nodes=" + nodes +
+ ", timeout=" + connectionTimeout +
+ ", maxRedirections=" + maxRedirections +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
new file mode 100644
index 0000000..0d821ed
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected final int maxTotal;
+ protected final int maxIdle;
+ protected final int minIdle;
+ protected final int connectionTimeout;
+
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
+ Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
+ Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
+ Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
+ Util.checkArgument(minIdle >= 0, "minIdle value can not be negative");
+
+ this.connectionTimeout = connectionTimeout;
+ this.maxTotal = maxTotal;
+ this.maxIdle = maxIdle;
+ this.minIdle = minIdle;
+ }
+
+ /**
+ * Returns timeout.
+ *
+ * @return connection timeout
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Get the value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code maxTotal} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMaxTotal()
+ */
+ public int getMaxTotal() {
+ return maxTotal;
+ }
+
+ /**
+ * Get the value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code maxIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMaxIdle()
+ */
+ public int getMaxIdle() {
+ return maxIdle;
+ }
+
+ /**
+ * Get the value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code minIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getMinIdle()
+ */
+ public int getMinIdle() {
+ return minIdle;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
new file mode 100644
index 0000000..d4c30ff
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.util.Objects;
+
+/**
+ * Configuration for Jedis pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String host;
+ private final int port;
+ private final int database;
+ private final String password;
+
+
+ /**
+ * Jedis pool configuration.
+ * The host is mandatory, and when host is not set, it throws NullPointerException.
+ *
+ * @param host hostname or IP
+ * @param port port, default value is 6379
+ * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
+ * @param password password, if any
+ * @param database database index
+ * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @throws NullPointerException if parameter {@code host} is {@code null}
+ */
+ private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ Objects.requireNonNull(host, "Host information should be presented");
+ this.host = host;
+ this.port = port;
+ this.database = database;
+ this.password = password;
+ }
+
+ /**
+ * Returns host.
+ *
+ * @return hostname or IP
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Returns port.
+ *
+ * @return port
+ */
+ public int getPort() {
+ return port;
+ }
+
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Builder for initializing {@link FlinkJedisPoolConfig}.
+ */
+ public static class Builder {
+ private String host;
+ private int port = Protocol.DEFAULT_PORT;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private String password;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Sets host.
+ *
+ * @param host host
+ * @return Builder itself
+ */
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ /**
+ * Sets port.
+ *
+ * @param port port, default value is 6379
+ * @return Builder itself
+ */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Sets timeout.
+ *
+ * @param timeout timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+
+ /**
+ * Builds JedisPoolConfig.
+ *
+ * @return JedisPoolConfig
+ */
+ public FlinkJedisPoolConfig build() {
+ return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisPoolConfig{" +
+ "host='" + host + '\'' +
+ ", port=" + port +
+ ", timeout=" + connectionTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
new file mode 100644
index 0000000..6058a53
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+ private final String masterName;
+ private final Set<String> sentinels;
+ private final int soTimeout;
+ private final String password;
+ private final int database;
+
+ /**
+ * Jedis Sentinels config.
+ * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException.
+ *
+ * @param masterName master name of the replica set
+ * @param sentinels set of sentinel hosts
+ * @param connectionTimeout timeout connection timeout
+ * @param soTimeout timeout socket timeout
+ * @param password password, if any
+ * @param database database database index
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ *
+ * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
+ * @throws IllegalArgumentException if {@code sentinels} are empty
+ */
+ private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
+ int connectionTimeout, int soTimeout,
+ String password, int database,
+ int maxTotal, int maxIdle, int minIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle);
+ Objects.requireNonNull(masterName, "Master name should be presented");
+ Objects.requireNonNull(sentinels, "Sentinels information should be presented");
+ Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
+
+ this.masterName = masterName;
+ this.sentinels = new HashSet<>(sentinels);
+ this.soTimeout = soTimeout;
+ this.password = password;
+ this.database = database;
+ }
+
+ /**
+ * Returns master name of the replica set.
+ *
+ * @return master name of the replica set.
+ */
+ public String getMasterName() {
+ return masterName;
+ }
+
+ /**
+ * Returns Sentinels host addresses.
+ *
+ * @return Set of Sentinels host addresses
+ */
+ public Set<String> getSentinels() {
+ return sentinels;
+ }
+
+ /**
+ * Returns socket timeout.
+ *
+ * @return socket timeout
+ */
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /**
+ * Builder for initializing {@link FlinkJedisSentinelConfig}.
+ */
+ public static class Builder {
+ private String masterName;
+ private Set<String> sentinels;
+ private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+ private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+ private String password;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+ /**
+ * Sets master name of the replica set.
+ *
+ * @param masterName master name of the replica set
+ * @return Builder itself
+ */
+ public Builder setMasterName(String masterName) {
+ this.masterName = masterName;
+ return this;
+ }
+
+ /**
+ * Sets sentinels address.
+ *
+ * @param sentinels host set of the sentinels
+ * @return Builder itself
+ */
+ public Builder setSentinels(Set<String> sentinels) {
+ this.sentinels = sentinels;
+ return this;
+ }
+
+ /**
+ * Sets connection timeout.
+ *
+ * @param connectionTimeout connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ /**
+ * Sets socket timeout.
+ *
+ * @param soTimeout socket timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisSentinelConfig.
+ *
+ * @return JedisSentinelConfig
+ */
+ public FlinkJedisSentinelConfig build(){
+ return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
+ password, database, maxTotal, maxIdle, minIdle);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JedisSentinelConfig{" +
+ "masterName='" + masterName + '\'' +
+ ", connectionTimeout=" + connectionTimeout +
+ ", soTimeout=" + soTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
new file mode 100644
index 0000000..cc1d626
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
+
+ private transient JedisCluster jedisCluster;
+
+ /**
+ * Initialize Redis command container for Redis cluster.
+ *
+ * @param jedisCluster JedisCluster instance
+ */
+ public RedisClusterContainer(JedisCluster jedisCluster) {
+ Objects.requireNonNull(jedisCluster, "Jedis cluster can not be null");
+
+ this.jedisCluster = jedisCluster;
+ }
+
+ @Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ jedisCluster.echo("Test");
+ }
+
+ @Override
+ public void hset(final String key, final String hashField, final String value) {
+ try {
+ jedisCluster.hset(key, hashField, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HSET to hash {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void rpush(final String listName, final String value) {
+ try {
+ jedisCluster.rpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void lpush(String listName, String value) {
+ try {
+ jedisCluster.lpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void sadd(final String setName, final String value) {
+ try {
+ jedisCluster.sadd(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void publish(final String channelName, final String message) {
+ try {
+ jedisCluster.publish(channelName, message);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+ channelName, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void set(final String key, final String value) {
+ try {
+ jedisCluster.set(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void pfadd(final String key, final String element) {
+ try {
+ jedisCluster.set(key, element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void zadd(final String key, final String score, final String element) {
+ try {
+ jedisCluster.zadd(key, Double.valueOf(score), element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Closes the {@link JedisCluster}.
+ */
+ @Override
+ public void close() throws IOException {
+ this.jedisCluster.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
new file mode 100644
index 0000000..78771f1
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+ /**
+ * Open the Jedis container.
+ *
+ * @throws Exception if the instance can not be opened properly
+ */
+ void open() throws Exception;
+
+ /**
+ * Sets field in the hash stored at key to value.
+ * If key does not exist, a new key holding a hash is created.
+ * If field already exists in the hash, it is overwritten.
+ *
+ * @param key Hash name
+ * @param hashField Hash field
+ * @param value Hash value
+ */
+ void hset(String key, String hashField, String value);
+
+ /**
+ * Insert the specified value at the tail of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ *
+ * @param listName Name of the List
+ * @param value Value to be added
+ */
+ void rpush(String listName, String value);
+
+ /**
+ * Insert the specified value at the head of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ *
+ * @param listName Name of the List
+ * @param value Value to be added
+ */
+ void lpush(String listName, String value);
+
+ /**
+ * Add the specified member to the set stored at key.
+ * Specified members that are already a member of this set are ignored.
+ * If key does not exist, a new set is created before adding the specified members.
+ *
+ * @param setName Name of the Set
+ * @param value Value to be added
+ */
+ void sadd(String setName, String value);
+
+ /**
+ * Posts a message to the given channel.
+ *
+ * @param channelName Name of the channel to which data will be published
+ * @param message the message
+ */
+ void publish(String channelName, String message);
+
+ /**
+ * Set key to hold the string value. If key already holds a value, it is overwritten,
+ * regardless of its type. Any previous time to live associated with the key is
+ * discarded on successful SET operation.
+ *
+ * @param key the key name in which value to be set
+ * @param value the value
+ */
+ void set(String key, String value);
+
+ /**
+ * Adds all the element arguments to the HyperLogLog data structure
+ * stored at the variable name specified as first argument.
+ *
+ * @param key The name of the key
+ * @param element the element
+ */
+ void pfadd(String key, String element);
+
+ /**
+ * Adds the specified member with the specified scores to the sorted set stored at key.
+ *
+ * @param key The name of the Sorted Set
+ * @param score Score of the element
+ * @param element element to be added
+ */
+ void zadd(String key, String score, String element);
+
+ /**
+ * Close the Jedis container.
+ *
+ * @throws IOException if the instance can not be closed properly
+ */
+ void close() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..0db5b05
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Objects;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+ /**
+ * Initialize the {@link RedisCommandsContainer} based on the instance type.
+ * @param flinkJedisConfigBase configuration base
+ * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+ */
+ public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
+ if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+ FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+ FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+ } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+ FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
+ return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+ } else {
+ throw new IllegalArgumentException("Jedis configuration not found");
+ }
+ }
+
+ /**
+ * Builds container for single Redis environment.
+ *
+ * @param jedisPoolConfig configuration for JedisPool
+ * @return container for single Redis environment
+ * @throws NullPointerException if jedisPoolConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
+ Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+ JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
+ return new RedisContainer(jedisPool);
+ }
+
+ /**
+ * Builds container for Redis Cluster environment.
+ *
+ * @param jedisClusterConfig configuration for JedisCluster
+ * @return container for Redis Cluster environment
+ * @throws NullPointerException if jedisClusterConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
+ Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+ JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+ return new RedisClusterContainer(jedisCluster);
+ }
+
+ /**
+ * Builds container for Redis Sentinel environment.
+ *
+ * @param jedisSentinelConfig configuration for JedisSentinel
+ * @return container for Redis sentinel environment
+ * @throws NullPointerException if jedisSentinelConfig is null
+ */
+ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
+ Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+ JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+ jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+ return new RedisContainer(jedisSentinelPool);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..fb73a27
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
+
+ private transient JedisPool jedisPool;
+ private transient JedisSentinelPool jedisSentinelPool;
+
+ /**
+ * Use this constructor if to connect with single Redis server.
+ *
+ * @param jedisPool JedisPool which actually manages Jedis instances
+ */
+ public RedisContainer(JedisPool jedisPool) {
+ Objects.requireNonNull(jedisPool, "Jedis Pool can not be null");
+ this.jedisPool = jedisPool;
+ this.jedisSentinelPool = null;
+ }
+
+ /**
+ * Use this constructor if Redis environment is clustered with sentinels.
+ *
+ * @param sentinelPool SentinelPool which actually manages Jedis instances
+ */
+ public RedisContainer(final JedisSentinelPool sentinelPool) {
+ Objects.requireNonNull(sentinelPool, "Jedis Sentinel Pool can not be null");
+ this.jedisPool = null;
+ this.jedisSentinelPool = sentinelPool;
+ }
+
+ /**
+ * Closes the Jedis instances.
+ */
+ @Override
+ public void close() throws IOException {
+ if (this.jedisPool != null) {
+ this.jedisPool.close();
+ }
+ if (this.jedisSentinelPool != null) {
+ this.jedisSentinelPool.close();
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ getInstance().echo("Test");
+ }
+
+ @Override
+ public void hset(final String key, final String hashField, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.hset(key, hashField, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
+ key, hashField, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void rpush(final String listName, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.rpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void lpush(String listName, String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.lpush(listName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
+ listName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void sadd(final String setName, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.sadd(setName, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+ setName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void publish(final String channelName, final String message) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.publish(channelName, message);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+ channelName, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void set(final String key, final String value) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.set(key, value);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void pfadd(final String key, final String element) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.pfadd(key, element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ @Override
+ public void zadd(final String key, final String score, final String element) {
+ Jedis jedis = null;
+ try {
+ jedis = getInstance();
+ jedis.zadd(key, Double.valueOf(score), element);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+ key, e.getMessage());
+ }
+ throw e;
+ } finally {
+ releaseInstance(jedis);
+ }
+ }
+
+ /**
+ * Returns Jedis instance from the pool.
+ *
+ * @return the Jedis instance
+ */
+ private Jedis getInstance() {
+ if (jedisSentinelPool != null) {
+ return jedisSentinelPool.getResource();
+ } else {
+ return jedisPool.getResource();
+ }
+ }
+
+ /**
+ * Closes the jedis instance after finishing the command.
+ *
+ * @param jedis The jedis instance
+ */
+ private void releaseInstance(final Jedis jedis) {
+ if (jedis == null) {
+ return;
+ }
+ try {
+ jedis.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close (return) instance to pool", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..cf9842c
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
+ */
+public enum RedisCommand {
+
+ /**
+ * Insert the specified value at the head of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operations.
+ */
+ LPUSH(RedisDataType.LIST),
+
+ /**
+ * Insert the specified value at the tail of the list stored at key.
+ * If key does not exist, it is created as empty list before performing the push operation.
+ */
+ RPUSH(RedisDataType.LIST),
+
+ /**
+ * Add the specified member to the set stored at key.
+ * Specified member that is already a member of this set is ignored.
+ */
+ SADD(RedisDataType.SET),
+
+ /**
+ * Set key to hold the string value. If key already holds a value,
+ * it is overwritten, regardless of its type.
+ */
+ SET(RedisDataType.STRING),
+
+ /**
+ * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
+ */
+ PFADD(RedisDataType.HYPER_LOG_LOG),
+
+ /**
+ * Posts a message to the given channel.
+ */
+ PUBLISH(RedisDataType.PUBSUB),
+
+ /**
+ * Adds the specified members with the specified score to the sorted set stored at key.
+ */
+ ZADD(RedisDataType.SORTED_SET),
+
+ /**
+ * Sets field in the hash stored at key to value. If key does not exist,
+ * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
+ */
+ HSET(RedisDataType.HASH);
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ */
+ private RedisDataType redisDataType;
+
+ RedisCommand(RedisDataType redisDataType) {
+ this.redisDataType = redisDataType;
+ }
+
+
+ /**
+ * The {@link RedisDataType} this command belongs to.
+ * @return the {@link RedisDataType}
+ */
+ public RedisDataType getRedisDataType(){
+ return redisDataType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..6ab329f
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The description of the command type. This must be passed while creating new {@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private RedisCommand redisCommand;
+
+ /**
+ * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+ * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+ * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+ * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
+ * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+ * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
+ */
+ private String additionalKey;
+
+ /**
+ * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+ * If different data type is specified, {@code additionalKey} is ignored.
+ * @param redisCommand the redis command type {@link RedisCommand}
+ * @param additionalKey additional key for Hash and Sorted set data type
+ */
+ public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+ Objects.requireNonNull(redisCommand, "Redis command type can not be null");
+ this.redisCommand = redisCommand;
+ this.additionalKey = additionalKey;
+
+ if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+ redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
+ if (additionalKey == null) {
+ throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+ }
+ }
+ }
+
+ /**
+ * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+ *
+ * @param redisCommand the redis data type {@link RedisCommand}
+ */
+ public RedisCommandDescription(RedisCommand redisCommand) {
+ this(redisCommand, null);
+ }
+
+ /**
+ * Returns the {@link RedisCommand}.
+ *
+ * @return the command type of the mapping
+ */
+ public RedisCommand getCommand() {
+ return redisCommand;
+ }
+
+ /**
+ * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+ *
+ * @return the additional key
+ */
+ public String getAdditionalKey() {
+ return additionalKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..989221c
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+ /**
+ * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
+ * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
+ * A String value can be at max 512 Megabytes in length.
+ */
+ STRING,
+
+ /**
+ * Redis Hashes are maps between string fields and string values.
+ */
+ HASH,
+
+ /**
+ * Redis Lists are simply lists of strings, sorted by insertion order.
+ */
+ LIST,
+
+ /**
+ * Redis Sets are an unordered collection of Strings.
+ */
+ SET,
+
+ /**
+ * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
+ * The difference is that every member of a Sorted Set is associated with score,
+ * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
+ * While members are unique, scores may be repeated.
+ */
+ SORTED_SET,
+
+ /**
+ * HyperLogLog is a probabilistic data structure used in order to count unique things.
+ */
+ HYPER_LOG_LOG,
+
+ /**
+ * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
+ * without knowledge of what (if any) subscribers there may be.
+ * Subscribers express interest in one or more channels, and only receive messages
+ * that are of interest, without knowledge of what (if any) publishers there are.
+ */
+ PUBSUB
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..b2580a7
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
+ * public RedisDataTypeDescription getCommandDescription() {
+ * return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ * }
+ * public String getKeyFromData(Tuple2<String, String> data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2<String, String> data) {
+ * return data.f1;
+ * }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+ /**
+ * Returns descriptor which defines data type.
+ *
+ * @return data type descriptor
+ */
+ RedisCommandDescription getCommandDescription();
+
+ /**
+ * Extracts key from data.
+ *
+ * @param data source data
+ * @return key
+ */
+ String getKeyFromData(T data);
+
+ /**
+ * Extracts value from data.
+ *
+ * @param data source data
+ * @return value
+ */
+ String getValueFromData(T data);
+}