You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/12/14 15:03:42 UTC

[2/2] flink git commit: [FLINK4429] Remove redis connector (now in Apache Bahir)

[FLINK4429] Remove redis connector (now in Apache Bahir)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8038ae4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8038ae4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8038ae4c

Branch: refs/heads/master
Commit: 8038ae4c843f802c195e757100b1bbc1d5de3ea8
Parents: 79d7e30
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Dec 14 14:56:04 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Dec 14 16:02:38 2016 +0100

----------------------------------------------------------------------
 docs/dev/connectors/index.md                    |   1 -
 docs/dev/connectors/redis.md                    | 174 -------------
 flink-connectors/flink-connector-redis/pom.xml  |  79 ------
 .../streaming/connectors/redis/RedisSink.java   | 188 --------------
 .../common/config/FlinkJedisClusterConfig.java  | 187 -------------
 .../common/config/FlinkJedisConfigBase.java     |  90 -------
 .../common/config/FlinkJedisPoolConfig.java     | 224 ----------------
 .../common/config/FlinkJedisSentinelConfig.java | 259 -------------------
 .../common/container/RedisClusterContainer.java | 171 ------------
 .../container/RedisCommandsContainer.java       | 115 --------
 .../RedisCommandsContainerBuilder.java          | 116 ---------
 .../redis/common/container/RedisContainer.java  | 252 ------------------
 .../redis/common/mapper/RedisCommand.java       |  86 ------
 .../common/mapper/RedisCommandDescription.java  |  94 -------
 .../redis/common/mapper/RedisDataType.java      |  66 -----
 .../redis/common/mapper/RedisMapper.java        |  66 -----
 .../connectors/redis/RedisITCaseBase.java       |  45 ----
 .../redis/RedisSentinelClusterTest.java         | 100 -------
 .../connectors/redis/RedisSinkITCase.java       | 233 -----------------
 .../redis/RedisSinkPublishITCase.java           | 137 ----------
 .../connectors/redis/RedisSinkTest.java         | 144 -----------
 .../common/config/FlinkJedisConfigBaseTest.java |  50 ----
 .../common/config/JedisClusterConfigTest.java   |  49 ----
 .../common/config/JedisPoolConfigTest.java      |  29 ---
 .../common/config/JedisSentinelConfigTest.java  |  49 ----
 .../mapper/RedisDataTypeDescriptionTest.java    |  41 ---
 flink-connectors/pom.xml                        |   1 -
 27 files changed, 3046 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index 5de5300..8764463 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -38,7 +38,6 @@ Currently these systems are supported:
  * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
  * [Apache NiFi](https://nifi.apache.org) (sink/source)
  * [Apache Cassandra](https://cassandra.apache.org/) (sink)
- * [Redis](http://redis.io/) (sink)
 
 To run an application using one of these connectors, additional third party
 components are usually required to be installed and launched, e.g. the servers

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/docs/dev/connectors/redis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/redis.md b/docs/dev/connectors/redis.md
deleted file mode 100644
index 0e3287d..0000000
--- a/docs/dev/connectors/redis.md
+++ /dev/null
@@ -1,174 +0,0 @@
----
-title: "Redis Connector"
-nav-title: Redis
-nav-parent_id: connectors
-nav-pos: 8
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to
-[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the
-following dependency to your project:
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-Version Compatibility: This module is compatible with Redis 2.8.5.
-
-Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/dev/linking).
-
-#### Installing Redis
-Follow the instructions from the [Redis download page](http://redis.io/download).
-
-#### Redis Sink
-A class providing an interface for sending data to Redis.
-The sink can use three different methods for communicating with different type of Redis environments:
-1. Single Redis Server
-2. Redis Cluster
-3. Redis Sentinel
-
-This code shows how to create a sink that communicate to a single redis server:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
-
-    @Override
-    public RedisCommandDescription getCommandDescription() {
-        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
-    }
-
-    @Override
-    public String getKeyFromData(Tuple2<String, String> data) {
-        return data.f0;
-    }
-
-    @Override
-    public String getValueFromData(Tuple2<String, String> data) {
-        return data.f1;
-    }
-}
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class RedisExampleMapper extends RedisMapper[(String, String)]{
-  override def getCommandDescription: RedisCommandDescription = {
-    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
-  }
-
-  override def getKeyFromData(data: (String, String)): String = data._1
-
-  override def getValueFromData(data: (String, String)): String = data._2
-}
-val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example code does the same, but for Redis Cluster:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
-    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example shows when the Redis environment is with Sentinels:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
-    .setMasterName("master").setSentinels(...).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This section gives a description of all the available data types and what Redis command used for that.
-
-<table class="table table-bordered" style="width: 75%">
-    <thead>
-        <tr>
-          <th class="text-center" style="width: 20%">Data Type</th>
-          <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
-          <th class="text-center" style="width: 25%">Redis Command [Source]</th>
-        </tr>
-      </thead>
-      <tbody>
-        <tr>
-            <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>LIST</td><td>
-                <a href="http://redis.io/commands/rpush">RPUSH</a>,
-                <a href="http://redis.io/commands/lpush">LPUSH</a>
-            </td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td>
-        </tr>
-        <tr>
-            <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td>
-        </tr>                
-      </tbody>
-</table>
-More about Redis can be found [here](http://redis.io/).

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/pom.xml b/flink-connectors/flink-connector-redis/pom.xml
deleted file mode 100644
index a348f31..0000000
--- a/flink-connectors/flink-connector-redis/pom.xml
+++ /dev/null
@@ -1,79 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-connectors</artifactId>
-		<version>1.2-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-redis_2.10</artifactId>
-	<name>flink-connector-redis</name>
-
-	<packaging>jar</packaging>
-
-	<properties>
-		<jedis.version>2.8.0</jedis.version>
-	</properties>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>redis.clients</groupId>
-			<artifactId>jedis</artifactId>
-			<version>${jedis.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.github.kstyrc</groupId>
-			<artifactId>embedded-redis</artifactId>
-			<version>0.6</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_2.10</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-    
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
deleted file mode 100644
index f6b0fd7..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * A sink that delivers data to a Redis channel using the Jedis client.
- * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
- * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
- * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
- * you want to connect to a single Redis server.
- * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
- * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
- * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
- * a Redis Cluster.
- *
- * <p>Example:
- *
- * <pre>
- *{@code
- *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
- *
- *	private RedisCommand redisCommand;
- *
- *	public RedisExampleMapper(RedisCommand redisCommand){
- *		this.redisCommand = redisCommand;
- *	}
- *	public RedisCommandDescription getCommandDescription() {
- *		return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
- *	}
- *	public String getKeyFromData(Tuple2<String, String> data) {
- *		return data.f0;
- *	}
- *	public String getValueFromData(Tuple2<String, String> data) {
- *		return data.f1;
- *	}
- *}
- *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
- *    .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
- *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
- *}</pre>
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class RedisSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
-
-	/**
-	 * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
-	 * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
-	 * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
-	 * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
-	 * {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
-	 * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
-	 * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
-	 */
-	private String additionalKey;
-	private RedisMapper<IN> redisSinkMapper;
-	private RedisCommand redisCommand;
-
-	private FlinkJedisConfigBase flinkJedisConfigBase;
-	private RedisCommandsContainer redisCommandsContainer;
-
-	/**
-	 * Creates a new {@link RedisSink} that connects to the Redis server.
-	 *
-	 * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
-	 * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
-	 */
-	public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
-		Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
-		Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
-		Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
-
-		this.flinkJedisConfigBase = flinkJedisConfigBase;
-
-		this.redisSinkMapper = redisSinkMapper;
-		RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
-		this.redisCommand = redisCommandDescription.getCommand();
-		this.additionalKey = redisCommandDescription.getAdditionalKey();
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Redis channel.
-	 * Depending on the specified Redis data type (see {@link RedisDataType}),
-	 * a different Redis command will be applied.
-	 * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
-	 *
-	 * @param input The incoming data
-	 */
-	@Override
-	public void invoke(IN input) throws Exception {
-		String key = redisSinkMapper.getKeyFromData(input);
-		String value = redisSinkMapper.getValueFromData(input);
-
-		switch (redisCommand) {
-			case RPUSH:
-				this.redisCommandsContainer.rpush(key, value);
-				break;
-			case LPUSH:
-				this.redisCommandsContainer.lpush(key, value);
-				break;
-			case SADD:
-				this.redisCommandsContainer.sadd(key, value);
-				break;
-			case SET:
-				this.redisCommandsContainer.set(key, value);
-				break;
-			case PFADD:
-				this.redisCommandsContainer.pfadd(key, value);
-				break;
-			case PUBLISH:
-				this.redisCommandsContainer.publish(key, value);
-				break;
-			case ZADD:
-				this.redisCommandsContainer.zadd(this.additionalKey, value, key);
-				break;
-			case HSET:
-				this.redisCommandsContainer.hset(this.additionalKey, key, value);
-				break;
-			default:
-				throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
-		}
-	}
-
-	/**
-	 * Initializes the connection to Redis by either cluster or sentinels or single server.
-	 *
-	 * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
-     */
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		try {
-			this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
-			this.redisCommandsContainer.open();
-		} catch (Exception e) {
-			LOG.error("Redis has not been properly initialized: ", e);
-			throw e;
-		}
-	}
-
-	/**
-	 * Closes commands container.
-	 * @throws IOException if command container is unable to close.
-	 */
-	@Override
-	public void close() throws IOException {
-		if (redisCommandsContainer != null) {
-			redisCommandsContainer.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
deleted file mode 100644
index 6e6cfe5..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Configuration for Jedis cluster.
- */
-public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
-	private static final long serialVersionUID = 1L;
-
-	private final Set<InetSocketAddress> nodes;
-	private final int maxRedirections;
-
-
-	/**
-	 * Jedis cluster configuration.
-	 * The list of node is mandatory, and when nodes is not set, it throws NullPointerException.
-	 *
-	 * @param nodes list of node information for JedisCluster
-	 * @param connectionTimeout socket / connection timeout. The default is 2000
-	 * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
-	 * @param maxTotal the maximum number of objects that can be allocated by the pool
-	 * @param maxIdle the cap on the number of "idle" instances in the pool
-	 * @param minIdle the minimum number of idle objects to maintain in the pool
-	 * @throws NullPointerException if parameter {@code nodes} is {@code null}
-	 */
-	private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
-									int maxTotal, int maxIdle, int minIdle) {
-		super(connectionTimeout, maxTotal, maxIdle, minIdle);
-
-		Preconditions.checkNotNull(nodes, "Node information should be presented");
-		Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
-		this.nodes = new HashSet<>(nodes);
-		this.maxRedirections = maxRedirections;
-	}
-
-
-
-	/**
-	 * Returns nodes.
-	 *
-	 * @return list of node information
-	 */
-	public Set<HostAndPort> getNodes() {
-		Set<HostAndPort> ret = new HashSet<>();
-		for (InetSocketAddress node : nodes) {
-			ret.add(new HostAndPort(node.getHostName(), node.getPort()));
-		}
-		return ret;
-	}
-
-	/**
-	 * Returns limit of redirection.
-	 *
-	 * @return limit of redirection
-	 */
-	public int getMaxRedirections() {
-		return maxRedirections;
-	}
-
-
-	/**
-	 * Builder for initializing  {@link FlinkJedisClusterConfig}.
-	 */
-	public static class Builder {
-		private Set<InetSocketAddress> nodes;
-		private int timeout = Protocol.DEFAULT_TIMEOUT;
-		private int maxRedirections = 5;
-		private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
-		private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
-		private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
-		/**
-		 * Sets list of node.
-		 *
-		 * @param nodes list of node
-		 * @return Builder itself
-		 */
-		public Builder setNodes(Set<InetSocketAddress> nodes) {
-			this.nodes = nodes;
-			return this;
-		}
-
-		/**
-		 * Sets socket / connection timeout.
-		 *
-		 * @param timeout socket / connection timeout, default value is 2000
-		 * @return Builder itself
-		 */
-		public Builder setTimeout(int timeout) {
-			this.timeout = timeout;
-			return this;
-		}
-
-		/**
-		 * Sets limit of redirection.
-		 *
-		 * @param maxRedirections limit of redirection, default value is 5
-		 * @return Builder itself
-		 */
-		public Builder setMaxRedirections(int maxRedirections) {
-			this.maxRedirections = maxRedirections;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code maxTotal} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
-		 * @return Builder itself
-		 */
-		public Builder setMaxTotal(int maxTotal) {
-			this.maxTotal = maxTotal;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code maxIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
-		 * @return Builder itself
-		 */
-		public Builder setMaxIdle(int maxIdle) {
-			this.maxIdle = maxIdle;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code minIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
-		 * @return Builder itself
-		 */
-		public Builder setMinIdle(int minIdle) {
-			this.minIdle = minIdle;
-			return this;
-		}
-
-		/**
-		 * Builds JedisClusterConfig.
-		 *
-		 * @return JedisClusterConfig
-		 */
-		public FlinkJedisClusterConfig build() {
-			return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JedisClusterConfig{" +
-			"nodes=" + nodes +
-			", timeout=" + connectionTimeout +
-			", maxRedirections=" + maxRedirections +
-			", maxTotal=" + maxTotal +
-			", maxIdle=" + maxIdle +
-			", minIdle=" + minIdle +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
deleted file mode 100644
index a2489b8..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * Base class for Flink Redis configuration.
- */
-public abstract class FlinkJedisConfigBase implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	protected final int maxTotal;
-	protected final int maxIdle;
-	protected final int minIdle;
-	protected final int connectionTimeout;
-
-	protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
-		Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
-		Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
-		Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
-		Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative");
-		this.connectionTimeout = connectionTimeout;
-		this.maxTotal = maxTotal;
-		this.maxIdle = maxIdle;
-		this.minIdle = minIdle;
-	}
-
-	/**
-	 * Returns timeout.
-	 *
-	 * @return connection timeout
-	 */
-	public int getConnectionTimeout() {
-		return connectionTimeout;
-	}
-
-	/**
-	 * Get the value for the {@code maxTotal} configuration attribute
-	 * for pools to be created with this configuration instance.
-	 *
-	 * @return  The current setting of {@code maxTotal} for this
-	 *          configuration instance
-	 * @see GenericObjectPoolConfig#getMaxTotal()
-	 */
-	public int getMaxTotal() {
-		return maxTotal;
-	}
-
-	/**
-	 * Get the value for the {@code maxIdle} configuration attribute
-	 * for pools to be created with this configuration instance.
-	 *
-	 * @return  The current setting of {@code maxIdle} for this
-	 *          configuration instance
-	 * @see GenericObjectPoolConfig#getMaxIdle()
-	 */
-	public int getMaxIdle() {
-		return maxIdle;
-	}
-
-	/**
-	 * Get the value for the {@code minIdle} configuration attribute
-	 * for pools to be created with this configuration instance.
-	 *
-	 * @return  The current setting of {@code minIdle} for this
-	 *          configuration instance
-	 * @see GenericObjectPoolConfig#getMinIdle()
-	 */
-	public int getMinIdle() {
-		return minIdle;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
deleted file mode 100644
index d261a35..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.Protocol;
-
-/**
- * Configuration for Jedis pool.
- */
-public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
-
-	private static final long serialVersionUID = 1L;
-
-	private final String host;
-	private final int port;
-	private final int database;
-	private final String password;
-
-
-	/**
-	 * Jedis pool configuration.
-	 * The host is mandatory, and when host is not set, it throws NullPointerException.
-	 *
-	 * @param host hostname or IP
-	 * @param port port, default value is 6379
-	 * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
-	 * @param password password, if any
-	 * @param database database index
-	 * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
-	 * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
-	 * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
-	 * @throws NullPointerException if parameter {@code host} is {@code null}
-	 */
-	private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
-								int maxTotal, int maxIdle, int minIdle) {
-		super(connectionTimeout, maxTotal, maxIdle, minIdle);
-		Preconditions.checkNotNull(host, "Host information should be presented");
-		this.host = host;
-		this.port = port;
-		this.database = database;
-		this.password = password;
-	}
-
-	/**
-	 * Returns host.
-	 *
-	 * @return hostname or IP
-	 */
-	public String getHost() {
-		return host;
-	}
-
-	/**
-	 * Returns port.
-	 *
-	 * @return port
-	 */
-	public int getPort() {
-		return port;
-	}
-
-
-	/**
-	 * Returns database index.
-	 *
-	 * @return database index
-	 */
-	public int getDatabase() {
-		return database;
-	}
-
-	/**
-	 * Returns password.
-	 *
-	 * @return password
-	 */
-	public String getPassword() {
-		return password;
-	}
-
-	/**
-	 * Builder for initializing  {@link FlinkJedisPoolConfig}.
-	 */
-	public static class Builder {
-		private String host;
-		private int port = Protocol.DEFAULT_PORT;
-		private int timeout = Protocol.DEFAULT_TIMEOUT;
-		private int database = Protocol.DEFAULT_DATABASE;
-		private String password;
-		private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
-		private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
-		private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
-		/**
-		 * Sets value for the {@code maxTotal} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
-         * @return Builder itself
-         */
-		public Builder setMaxTotal(int maxTotal) {
-			this.maxTotal = maxTotal;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code maxIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
-         * @return Builder itself
-         */
-		public Builder setMaxIdle(int maxIdle) {
-			this.maxIdle = maxIdle;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code minIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
-         * @return Builder itself
-         */
-		public Builder setMinIdle(int minIdle) {
-			this.minIdle = minIdle;
-			return this;
-		}
-
-		/**
-		 * Sets host.
-		 *
-		 * @param host host
-		 * @return Builder itself
-		 */
-		public Builder setHost(String host) {
-			this.host = host;
-			return this;
-		}
-
-		/**
-		 * Sets port.
-		 *
-		 * @param port port, default value is 6379
-		 * @return Builder itself
-		 */
-		public Builder setPort(int port) {
-			this.port = port;
-			return this;
-		}
-
-		/**
-		 * Sets timeout.
-		 *
-		 * @param timeout timeout, default value is 2000
-		 * @return Builder itself
-		 */
-		public Builder setTimeout(int timeout) {
-			this.timeout = timeout;
-			return this;
-		}
-
-		/**
-		 * Sets database index.
-		 *
-		 * @param database database index, default value is 0
-		 * @return Builder itself
-		 */
-		public Builder setDatabase(int database) {
-			this.database = database;
-			return this;
-		}
-
-		/**
-		 * Sets password.
-		 *
-		 * @param password password, if any
-		 * @return Builder itself
-		 */
-		public Builder setPassword(String password) {
-			this.password = password;
-			return this;
-		}
-
-
-		/**
-		 * Builds JedisPoolConfig.
-		 *
-		 * @return JedisPoolConfig
-		 */
-		public FlinkJedisPoolConfig build() {
-			return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JedisPoolConfig{" +
-			"host='" + host + '\'' +
-			", port=" + port +
-			", timeout=" + connectionTimeout +
-			", database=" + database +
-			", maxTotal=" + maxTotal +
-			", maxIdle=" + maxIdle +
-			", minIdle=" + minIdle +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
deleted file mode 100644
index 2cdb397..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Protocol;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Configuration for Jedis Sentinel pool.
- */
-public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
-
-	private final String masterName;
-	private final Set<String> sentinels;
-	private final int soTimeout;
-	private final String password;
-	private final int database;
-
-	/**
-	 * Jedis Sentinels config.
-	 * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException.
-	 *
-	 * @param masterName master name of the replica set
-	 * @param sentinels set of sentinel hosts
-	 * @param connectionTimeout timeout connection timeout
-	 * @param soTimeout timeout socket timeout
-	 * @param password password, if any
-	 * @param database database database index
-	 * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
-	 * @param maxIdle the cap on the number of "idle" instances in the pool
-	 * @param minIdle the minimum number of idle objects to maintain in the pool
-	 *
-	 * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
-	 * @throws IllegalArgumentException if {@code sentinels} are empty
-	 */
-	private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
-									int connectionTimeout, int soTimeout,
-									String password, int database,
-									int maxTotal, int maxIdle, int minIdle) {
-		super(connectionTimeout, maxTotal, maxIdle, minIdle);
-		Preconditions.checkNotNull(masterName, "Master name should be presented");
-		Preconditions.checkNotNull(sentinels, "Sentinels information should be presented");
-		Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
-
-		this.masterName = masterName;
-		this.sentinels = new HashSet<>(sentinels);
-		this.soTimeout = soTimeout;
-		this.password = password;
-		this.database = database;
-	}
-
-	/**
-	 * Returns master name of the replica set.
-	 *
-	 * @return master name of the replica set.
-	 */
-	public String getMasterName() {
-		return masterName;
-	}
-
-	/**
-	 * Returns Sentinels host addresses.
-	 *
-	 * @return Set of Sentinels host addresses
-	 */
-	public Set<String> getSentinels() {
-		return sentinels;
-	}
-
-	/**
-	 * Returns socket timeout.
-	 *
-	 * @return socket timeout
-	 */
-	public int getSoTimeout() {
-		return soTimeout;
-	}
-
-	/**
-	 * Returns password.
-	 *
-	 * @return password
-	 */
-	public String getPassword() {
-		return password;
-	}
-
-	/**
-	 * Returns database index.
-	 *
-	 * @return database index
-	 */
-	public int getDatabase() {
-		return database;
-	}
-
-	/**
-	 * Builder for initializing {@link FlinkJedisSentinelConfig}.
-	 */
-	public static class Builder {
-		private String masterName;
-		private Set<String> sentinels;
-		private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
-		private int soTimeout = Protocol.DEFAULT_TIMEOUT;
-		private String password;
-		private int database = Protocol.DEFAULT_DATABASE;
-		private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
-		private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
-		private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
-		/**
-		 * Sets master name of the replica set.
-		 *
-		 * @param masterName  master name of the replica set
-		 * @return Builder itself
-         */
-		public Builder setMasterName(String masterName) {
-			this.masterName = masterName;
-			return this;
-		}
-
-		/**
-		 * Sets sentinels address.
-		 *
-		 * @param sentinels host set of the sentinels
-		 * @return Builder itself
-         */
-		public Builder setSentinels(Set<String> sentinels) {
-			this.sentinels = sentinels;
-			return this;
-		}
-
-		/**
-		 * Sets connection timeout.
-		 *
-		 * @param connectionTimeout connection timeout, default value is 2000
-		 * @return Builder itself
-		 */
-		public Builder setConnectionTimeout(int connectionTimeout) {
-			this.connectionTimeout = connectionTimeout;
-			return this;
-		}
-
-		/**
-		 * Sets socket timeout.
-		 *
-		 * @param soTimeout socket timeout, default value is 2000
-         * @return Builder itself
-         */
-		public Builder setSoTimeout(int soTimeout) {
-			this.soTimeout = soTimeout;
-			return this;
-		}
-
-		/**
-		 * Sets password.
-		 *
-		 * @param password password, if any
-		 * @return Builder itself
-		 */
-		public Builder setPassword(String password) {
-			this.password = password;
-			return this;
-		}
-
-		/**
-		 * Sets database index.
-		 *
-		 * @param database database index, default value is 0
-		 * @return Builder itself
-		 */
-		public Builder setDatabase(int database) {
-			this.database = database;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code maxTotal} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
-		 * @return Builder itself
-		 */
-		public Builder setMaxTotal(int maxTotal) {
-			this.maxTotal = maxTotal;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code maxIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
-		 * @return Builder itself
-		 */
-		public Builder setMaxIdle(int maxIdle) {
-			this.maxIdle = maxIdle;
-			return this;
-		}
-
-		/**
-		 * Sets value for the {@code minIdle} configuration attribute
-		 * for pools to be created with this configuration instance.
-		 *
-		 * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
-		 * @return Builder itself
-		 */
-		public Builder setMinIdle(int minIdle) {
-			this.minIdle = minIdle;
-			return this;
-		}
-
-		/**
-		 * Builds JedisSentinelConfig.
-		 *
-		 * @return JedisSentinelConfig
-		 */
-		public FlinkJedisSentinelConfig build(){
-			return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
-				password, database, maxTotal, maxIdle, minIdle);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "JedisSentinelConfig{" +
-			"masterName='" + masterName + '\'' +
-			", connectionTimeout=" + connectionTimeout +
-			", soTimeout=" + soTimeout +
-			", database=" + database +
-			", maxTotal=" + maxTotal +
-			", maxIdle=" + maxIdle +
-			", minIdle=" + minIdle +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
deleted file mode 100644
index d6621d6..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCluster;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Redis command container if we want to connect to a Redis cluster.
- */
-public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
-
-	private transient JedisCluster jedisCluster;
-
-	/**
-	 * Initialize Redis command container for Redis cluster.
-	 *
-	 * @param jedisCluster JedisCluster instance
-	 */
-	public RedisClusterContainer(JedisCluster jedisCluster) {
-		Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null");
-
-		this.jedisCluster = jedisCluster;
-	}
-
-	@Override
-	public void open() throws Exception {
-
-		// echo() tries to open a connection and echos back the
-		// message passed as argument. Here we use it to monitor
-		// if we can communicate with the cluster.
-
-		jedisCluster.echo("Test");
-	}
-
-	@Override
-	public void hset(final String key, final String hashField, final String value) {
-		try {
-			jedisCluster.hset(key, hashField, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command HSET to hash {} error message {}",
-					key, hashField, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void rpush(final String listName, final String value) {
-		try {
-			jedisCluster.rpush(listName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}",
-					listName, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void lpush(String listName, String value) {
-		try {
-			jedisCluster.lpush(listName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}",
-					listName, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void sadd(final String setName, final String value) {
-		try {
-			jedisCluster.sadd(setName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
-					setName, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void publish(final String channelName, final String message) {
-		try {
-			jedisCluster.publish(channelName, message);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
-					channelName, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void set(final String key, final String value) {
-		try {
-			jedisCluster.set(key, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command SET to key {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void pfadd(final String key, final String element) {
-		try {
-			jedisCluster.set(key, element);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	@Override
-	public void zadd(final String key, final String score, final String element) {
-		try {
-			jedisCluster.zadd(key, Double.valueOf(score), element);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		}
-	}
-
-	/**
-	 * Closes the {@link JedisCluster}.
-	 */
-	@Override
-	public void close() throws IOException {
-		this.jedisCluster.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
deleted file mode 100644
index 55dbfc2..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The container for all available Redis commands.
- */
-public interface RedisCommandsContainer extends Serializable {
-
-	/**
-	 * Open the Jedis container.
-	 *
-	 * @throws Exception if the instance can not be opened properly
-	 */
-	void open() throws Exception;
-
-	/**
-	 * Sets field in the hash stored at key to value.
-	 * If key does not exist, a new key holding a hash is created.
-	 * If field already exists in the hash, it is overwritten.
-	 *
-	 * @param key Hash name
-	 * @param hashField Hash field
-	 * @param value Hash value
-	 */
-	void hset(String key, String hashField, String value);
-
-	/**
-	 * Insert the specified value at the tail of the list stored at key.
-	 * If key does not exist, it is created as empty list before performing the push operation.
-	 *
-	 * @param listName Name of the List
-	 * @param value  Value to be added
-	 */
-	void rpush(String listName, String value);
-
-	/**
-	 * Insert the specified value at the head of the list stored at key.
-	 * If key does not exist, it is created as empty list before performing the push operation.
-	 *
-	 * @param listName Name of the List
-	 * @param value  Value to be added
-	 */
-	void lpush(String listName, String value);
-
-	/**
-	 * Add the specified member to the set stored at key.
-	 * Specified members that are already a member of this set are ignored.
-	 * If key does not exist, a new set is created before adding the specified members.
-	 *
-	 * @param setName Name of the Set
-	 * @param value Value to be added
-	 */
-	void sadd(String setName, String value);
-
-	/**
-	 * Posts a message to the given channel.
-	 *
-	 * @param channelName Name of the channel to which data will be published
-	 * @param message the message
-	 */
-	void publish(String channelName, String message);
-
-	/**
-	 * Set key to hold the string value. If key already holds a value, it is overwritten,
-	 * regardless of its type. Any previous time to live associated with the key is
-	 * discarded on successful SET operation.
-	 *
-	 * @param key the key name in which value to be set
-	 * @param value the value
-	 */
-	void set(String key, String value);
-
-	/**
-	 * Adds all the element arguments to the HyperLogLog data structure
-	 * stored at the variable name specified as first argument.
-	 *
-	 * @param key The name of the key
-	 * @param element the element
-	 */
-	void pfadd(String key, String element);
-
-	/**
-	 * Adds the specified member with the specified scores to the sorted set stored at key.
-	 *
-	 * @param key The name of the Sorted Set
-	 * @param score Score of the element
-	 * @param element  element to be added
-	 */
-	void zadd(String key, String score, String element);
-
-	/**
-	 * Close the Jedis container.
-	 *
-	 * @throws IOException if the instance can not be closed properly
-	 */
-	void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
deleted file mode 100644
index dc5396a..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisSentinelPool;
-
-/**
- * The builder for {@link RedisCommandsContainer}.
- */
-public class RedisCommandsContainerBuilder {
-
-	/**
-	 * Initialize the {@link RedisCommandsContainer} based on the instance type.
-	 * @param flinkJedisConfigBase configuration base
-	 * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
-     */
-	public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
-		if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
-			FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
-			return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
-		} else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
-			FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
-			return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
-		} else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
-			FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
-			return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
-		} else {
-			throw new IllegalArgumentException("Jedis configuration not found");
-		}
-	}
-
-	/**
-	 * Builds container for single Redis environment.
-	 *
-	 * @param jedisPoolConfig configuration for JedisPool
-	 * @return container for single Redis environment
-	 * @throws NullPointerException if jedisPoolConfig is null
-	 */
-	public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
-		Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
-
-		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
-		genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
-		genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
-		genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
-
-		JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
-			jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
-			jedisPoolConfig.getDatabase());
-		return new RedisContainer(jedisPool);
-	}
-
-	/**
-	 * Builds container for Redis Cluster environment.
-	 *
-	 * @param jedisClusterConfig configuration for JedisCluster
-	 * @return container for Redis Cluster environment
-	 * @throws NullPointerException if jedisClusterConfig is null
-	 */
-	public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
-		Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
-
-		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
-		genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
-		genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
-		genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
-
-		JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
-			jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
-		return new RedisClusterContainer(jedisCluster);
-	}
-
-	/**
-	 * Builds container for Redis Sentinel environment.
-	 *
-	 * @param jedisSentinelConfig configuration for JedisSentinel
-	 * @return container for Redis sentinel environment
-	 * @throws NullPointerException if jedisSentinelConfig is null
-	 */
-	public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
-		Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
-
-		GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
-		genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
-		genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
-		genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
-
-		JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
-			jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
-			jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
-			jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
-		return new RedisContainer(jedisSentinelPool);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
deleted file mode 100644
index ba4bbda..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisSentinelPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Redis command container if we want to connect to a single Redis server or to Redis sentinels
- * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
- * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
- */
-public class RedisContainer implements RedisCommandsContainer, Closeable {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
-
-	private transient JedisPool jedisPool;
-	private transient JedisSentinelPool jedisSentinelPool;
-
-	/**
-	 * Use this constructor if to connect with single Redis server.
-	 *
-	 * @param jedisPool JedisPool which actually manages Jedis instances
-	 */
-	public RedisContainer(JedisPool jedisPool) {
-		Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null");
-		this.jedisPool = jedisPool;
-		this.jedisSentinelPool = null;
-	}
-
-	/**
-	 * Use this constructor if Redis environment is clustered with sentinels.
-	 *
-	 * @param sentinelPool SentinelPool which actually manages Jedis instances
-	 */
-	public RedisContainer(final JedisSentinelPool sentinelPool) {
-		Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null");
-		this.jedisPool = null;
-		this.jedisSentinelPool = sentinelPool;
-	}
-
-	/**
-	 * Closes the Jedis instances.
-	 */
-	@Override
-	public void close() throws IOException {
-		if (this.jedisPool != null) {
-			this.jedisPool.close();
-		}
-		if (this.jedisSentinelPool != null) {
-			this.jedisSentinelPool.close();
-		}
-	}
-
-	@Override
-	public void open() throws Exception {
-
-		// echo() tries to open a connection and echos back the
-		// message passed as argument. Here we use it to monitor
-		// if we can communicate with the cluster.
-
-		getInstance().echo("Test");
-	}
-
-	@Override
-	public void hset(final String key, final String hashField, final String value) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.hset(key, hashField, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
-					key, hashField, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void rpush(final String listName, final String value) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.rpush(listName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
-					listName, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void lpush(String listName, String value) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.lpush(listName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
-					listName, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void sadd(final String setName, final String value) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.sadd(setName, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
-					setName, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void publish(final String channelName, final String message) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.publish(channelName, message);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
-					channelName, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void set(final String key, final String value) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.set(key, value);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command SET to key {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void pfadd(final String key, final String element) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.pfadd(key, element);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	@Override
-	public void zadd(final String key, final String score, final String element) {
-		Jedis jedis = null;
-		try {
-			jedis = getInstance();
-			jedis.zadd(key, Double.valueOf(score), element);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
-					key, e.getMessage());
-			}
-			throw e;
-		} finally {
-			releaseInstance(jedis);
-		}
-	}
-
-	/**
-	 * Returns Jedis instance from the pool.
-	 *
-	 * @return the Jedis instance
-     */
-	private Jedis getInstance() {
-		if (jedisSentinelPool != null) {
-			return jedisSentinelPool.getResource();
-		} else {
-			return jedisPool.getResource();
-		}
-	}
-
-	/**
-	 * Closes the jedis instance after finishing the command.
-	 *
-	 * @param jedis The jedis instance
-     */
-	private void releaseInstance(final Jedis jedis) {
-		if (jedis == null) {
-			return;
-		}
-		try {
-			jedis.close();
-		} catch (Exception e) {
-			LOG.error("Failed to close (return) instance to pool", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
deleted file mode 100644
index b0661c7..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-/**
- * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
- */
-public enum RedisCommand {
-
-	/**
-	 * Insert the specified value at the head of the list stored at key.
-	 * If key does not exist, it is created as empty list before performing the push operations.
-	 */
-	LPUSH(RedisDataType.LIST),
-
-	/**
-	 * Insert the specified value at the tail of the list stored at key.
-	 * If key does not exist, it is created as empty list before performing the push operation.
-	 */
-	RPUSH(RedisDataType.LIST),
-
-	/**
-	 * Add the specified member to the set stored at key.
-	 * Specified member that is already a member of this set is ignored.
-	 */
-	SADD(RedisDataType.SET),
-
-	/**
-	 * Set key to hold the string value. If key already holds a value,
-	 * it is overwritten, regardless of its type.
-	 */
-	SET(RedisDataType.STRING),
-
-	/**
-	 * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
-	 */
-	PFADD(RedisDataType.HYPER_LOG_LOG),
-
-	/**
-	 * Posts a message to the given channel.
-	 */
-	PUBLISH(RedisDataType.PUBSUB),
-
-	/**
-	 * Adds the specified members with the specified score to the sorted set stored at key.
-	 */
-	ZADD(RedisDataType.SORTED_SET),
-
-	/**
-	 * Sets field in the hash stored at key to value. If key does not exist,
-	 * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
-	 */
-	HSET(RedisDataType.HASH);
-
-	/**
-	 * The {@link RedisDataType} this command belongs to.
-	 */
-	private RedisDataType redisDataType;
-
-	RedisCommand(RedisDataType redisDataType) {
-		this.redisDataType = redisDataType;
-	}
-
-
-	/**
-	 * The {@link RedisDataType} this command belongs to.
-	 * @return the {@link RedisDataType}
-	 */
-	public RedisDataType getRedisDataType(){
-		return redisDataType;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
deleted file mode 100644
index 1eea48a..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * The description of the command type. This must be passed while creating new {@link RedisMapper}.
- * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
- * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
- * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
- *
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
- * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
- */
-public class RedisCommandDescription implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private RedisCommand redisCommand;
-
-	/**
-	 * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
-	 * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
-	 * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
-	 * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
-	 * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
-	 * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
-	 * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
-	 */
-	private String additionalKey;
-
-	/**
-	 * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
-	 * If different data type is specified, {@code additionalKey} is ignored.
-	 * @param redisCommand the redis command type {@link RedisCommand}
-	 * @param additionalKey additional key for Hash and Sorted set data type
-	 */
-	public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
-		Preconditions.checkNotNull(redisCommand, "Redis command type can not be null");
-		this.redisCommand = redisCommand;
-		this.additionalKey = additionalKey;
-
-		if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
-			redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
-			if (additionalKey == null) {
-				throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
-			}
-		}
-	}
-
-	/**
-	 * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
-	 *
-	 * @param redisCommand the redis data type {@link RedisCommand}
-	 */
-	public RedisCommandDescription(RedisCommand redisCommand) {
-		this(redisCommand, null);
-	}
-
-	/**
-	 * Returns the {@link RedisCommand}.
-	 *
-	 * @return the command type of the mapping
-	 */
-	public RedisCommand getCommand() {
-		return redisCommand;
-	}
-
-	/**
-	 * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
-	 *
-	 * @return the additional key
-	 */
-	public String getAdditionalKey() {
-		return additionalKey;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
deleted file mode 100644
index 6e3997c..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-/**
- * All available data type for Redis.
- */
-public enum RedisDataType {
-
-	/**
-	 * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
-	 * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
-	 * A String value can be at max 512 Megabytes in length.
-	 */
-	STRING,
-
-	/**
-	 * Redis Hashes are maps between string fields and string values.
-	 */
-	HASH,
-
-	/**
-	 * Redis Lists are simply lists of strings, sorted by insertion order.
-	 */
-	LIST,
-
-	/**
-	 * Redis Sets are an unordered collection of Strings.
-	 */
-	SET,
-
-	/**
-	 * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
-	 * The difference is that every member of a Sorted Set is associated with score,
-	 * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
-	 * While members are unique, scores may be repeated.
-	 */
-	SORTED_SET,
-
-	/**
-	 * HyperLogLog is a probabilistic data structure used in order to count unique things.
-	 */
-	HYPER_LOG_LOG,
-
-	/**
-	 * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
-	 * without knowledge of what (if any) subscribers there may be.
-	 * Subscribers express interest in one or more channels, and only receive messages
-	 * that are of interest, without knowledge of what (if any) publishers there are.
-	 */
-	PUBSUB
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
deleted file mode 100644
index 63fed19..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.api.common.functions.Function;
-
-import java.io.Serializable;
-
-/**
- * Function that creates the description how the input data should be mapped to redis type.
- *<p>Example:
- *<pre>{@code
- *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
- *    public RedisDataTypeDescription getCommandDescription() {
- *        return new RedisDataTypeDescription(RedisCommand.PUBLISH);
- *    }
- *    public String getKeyFromData(Tuple2<String, String> data) {
- *        return data.f0;
- *    }
- *    public String getValueFromData(Tuple2<String, String> data) {
- *        return data.f1;
- *    }
- *}
- *}</pre>
- *
- * @param <T> The type of the element handled by this {@code RedisMapper}
- */
-public interface RedisMapper<T> extends Function, Serializable {
-
-	/**
-	 * Returns descriptor which defines data type.
-	 *
-	 * @return data type descriptor
-	 */
-	RedisCommandDescription getCommandDescription();
-
-	/**
-	 * Extracts key from data.
-	 *
-	 * @param data source data
-	 * @return key
-	 */
-	String getKeyFromData(T data);
-
-	/**
-	 * Extracts value from data.
-	 *
-	 * @param data source data
-	 * @return value
-	 */
-	String getValueFromData(T data);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
deleted file mode 100644
index 7d98f2d..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import redis.embedded.RedisServer;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.NetUtils.getAvailablePort;
-
-public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase {
-
-	public static final int REDIS_PORT = getAvailablePort();
-	public static final String REDIS_HOST = "127.0.0.1";
-
-	private static RedisServer redisServer;
-
-	@BeforeClass
-	public static void createRedisServer() throws IOException, InterruptedException {
-		redisServer = new RedisServer(REDIS_PORT);
-		redisServer.start();
-	}
-
-	@AfterClass
-	public static void stopRedisServer(){
-		redisServer.stop();
-	}
-}