You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/20 02:10:57 UTC

[1/5] bahir-flink git commit: [BAHIR-54] Add .travis.yml file

Repository: bahir-flink
Updated Branches:
  refs/heads/master ab6723dc4 -> b2955a749


[BAHIR-54] Add .travis.yml file


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/d718de7e
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/d718de7e
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/d718de7e

Branch: refs/heads/master
Commit: d718de7e6b903b6aa15d8b0a4c910ec6a61cf482
Parents: ab6723d
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 19 10:33:11 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Aug 19 19:07:48 2016 -0700

----------------------------------------------------------------------
 .travis.yml | 31 +++++++++++++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/d718de7e/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..0d123c4
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+language: java
+
+env:
+  - FLINK_VERSION="1.0.3"
+  - FLINK_VERSION="1.1.1"
+
+jdk:
+  - oraclejdk8
+  - oraclejdk7
+  - openjdk7
+
+
+
+script: mvn clean verify -Dflink.version=$FLINK_VERSION
\ No newline at end of file


[2/5] bahir-flink git commit: [BAHIR-54] Remove scala support from parent pom

Posted by lr...@apache.org.
[BAHIR-54] Remove scala support from parent pom


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/2d28f10b
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/2d28f10b
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/2d28f10b

Branch: refs/heads/master
Commit: 2d28f10b2316e8a71bd804411811b3f13d9fe607
Parents: d718de7
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 19 10:54:53 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Aug 19 19:08:16 2016 -0700

----------------------------------------------------------------------
 pom.xml | 235 +----------------------------------------------------------
 1 file changed, 3 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/2d28f10b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d2d9ab6..8ed5789 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
   </mailingLists>
 
   <modules>
-
+    <module>flink-connector-redis</module>
   </modules>
 
   <properties>
@@ -84,7 +84,6 @@
 
     <!-- General project dependencies version -->
     <java.version>1.7</java.version>
-    <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
 
     <maven.version>3.3.9</maven.version>
@@ -126,63 +125,9 @@
     </pluginRepository>
   </pluginRepositories>
   
-  <dependencies>
-    <!--
-         This is needed by the scalatest plugin, and so is declared here to be available in
-         all child modules, just as scalatest is run in all children
-    -->
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
 
   <dependencyManagement>
     <dependencies>
-      <!-- Scala Related Dependencies -->
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-compiler</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-reflect</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-actors</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scalap</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest_${scala.binary.version}</artifactId>
-        <version>2.2.6</version>
-        <scope>test</scope>
-      </dependency>
-      <dependency>
-        <groupId>org.scalacheck</groupId>
-        <artifactId>scalacheck_${scala.binary.version}</artifactId>
-        <version>1.12.5</version> <!-- 1.13.0 appears incompatible with scalatest 2.2.6 -->
-        <scope>test</scope>
-      </dependency>
 
       <!-- Test Dependencies -->
       <dependency>
@@ -219,15 +164,6 @@
   </dependencyManagement>
 
   <build>
-    <resources>
-      <resource>
-        <directory>${basedir}/python</directory>
-        <includes>
-          <include>**/*.py</include>
-        </includes>
-      </resource>
-    </resources>
-
     <pluginManagement>
       <plugins>
         <plugin>
@@ -292,7 +228,6 @@
               <exclude>**/dependency-reduced-pom.xml</exclude>
               <exclude>**/target/**</exclude>
               <exclude>**/README.md</exclude>
-              <exclude>**/examples/data/*.txt</exclude>
             </excludes>
           </configuration>
         </plugin>
@@ -301,69 +236,7 @@
           <artifactId>build-helper-maven-plugin</artifactId>
           <version>1.10</version>
         </plugin>
-        <plugin>
-          <groupId>net.alchim31.maven</groupId>
-          <artifactId>scala-maven-plugin</artifactId>
-          <version>3.2.2</version>
-          <executions>
-            <execution>
-              <id>eclipse-add-source</id>
-              <goals>
-                <goal>add-source</goal>
-              </goals>
-            </execution>
-            <execution>
-              <id>scala-compile-first</id>
-              <phase>process-resources</phase>
-              <goals>
-                <goal>compile</goal>
-              </goals>
-            </execution>
-            <execution>
-              <id>scala-test-compile-first</id>
-              <phase>process-test-resources</phase>
-              <goals>
-                <goal>testCompile</goal>
-              </goals>
-              <configuration>
-                <source>${java.version}</source>
-                <target>${java.version}</target>
-                <encoding>UTF-8</encoding>
-              </configuration>
-            </execution>
-            <execution>
-              <id>attach-scaladocs</id>
-              <phase>verify</phase>
-              <goals>
-                <goal>doc-jar</goal>
-              </goals>
-            </execution>
-          </executions>
-          <configuration>
-            <scalaVersion>${scala.version}</scalaVersion>
-            <recompileMode>incremental</recompileMode>
-            <useZincServer>true</useZincServer>
-            <args>
-              <arg>-unchecked</arg>
-              <arg>-deprecation</arg>
-              <arg>-feature</arg>
-            </args>
-            <jvmArgs>
-              <jvmArg>-Xms1024m</jvmArg>
-              <jvmArg>-Xmx1024m</jvmArg>
-              <jvmArg>-XX:PermSize=${PermGen}</jvmArg>
-              <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
-              <jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
-            </jvmArgs>
-            <javacArgs>
-              <javacArg>-source</javacArg>
-              <javacArg>${java.version}</javacArg>
-              <javacArg>-target</javacArg>
-              <javacArg>${java.version}</javacArg>
-              <javacArg>-Xlint:all,-serial,-path</javacArg>
-            </javacArgs>
-          </configuration>
-        </plugin>
+
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
@@ -414,36 +287,7 @@
             </execution>
           </executions>
         </plugin>
-        <!-- Scalatest runs all Scala tests -->
-        <plugin>
-          <groupId>org.scalatest</groupId>
-          <artifactId>scalatest-maven-plugin</artifactId>
-          <version>1.0</version>
-          <!-- Note config is repeated in surefire config -->
-          <configuration>
-            <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-            <junitxml>.</junitxml>
-            <filereports>SparkTestSuite.txt</filereports>
-            <argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
-            <stderr />
-            <systemProperties>
-              <log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
-              <derby.system.durability>test</derby.system.durability>
-              <java.awt.headless>true</java.awt.headless>
-              <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
-              <!-- Needed by sql/hive tests. -->
-              <test.src.tables>__not_used__</test.src.tables>
-            </systemProperties>
-          </configuration>
-          <executions>
-            <execution>
-              <id>test</id>
-              <goals>
-                <goal>test</goal>
-              </goals>
-            </execution>
-          </executions>
-        </plugin>
+  
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-jar-plugin</artifactId>
@@ -475,19 +319,6 @@
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-clean-plugin</artifactId>
           <version>3.0.0</version>
-          <configuration>
-            <filesets>
-              <fileset>
-                <directory>work</directory>
-              </fileset>
-              <fileset>
-                <directory>checkpoint</directory>
-              </fileset>
-              <fileset>
-                <directory>lib_managed</directory>
-              </fileset>
-            </filesets>
-          </configuration>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
@@ -640,30 +471,6 @@
         <artifactId>maven-source-plugin</artifactId>
       </plugin>
       <plugin>
-        <groupId>org.scalastyle</groupId>
-        <artifactId>scalastyle-maven-plugin</artifactId>
-        <version>0.8.0</version>
-        <configuration>
-          <verbose>false</verbose>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-          <failOnWarning>false</failOnWarning>
-          <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-          <testSourceDirectories>${basedir}/src/test/scala</testSourceDirectories>
-          <configLocation>scalastyle-config.xml</configLocation>
-          <outputFile>${basedir}/target/scalastyle-output.xml</outputFile>
-          <inputEncoding>${project.build.sourceEncoding}</inputEncoding>
-          <outputEncoding>${project.reporting.outputEncoding}</outputEncoding>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <version>2.17</version>
@@ -687,34 +494,11 @@
         </executions>
       </plugin>
 
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>create-tmp-dir</id>
-            <phase>generate-test-resources</phase>
-            <goals>
-              <goal>run</goal>
-            </goals>
-            <configuration>
-              <target>
-                <mkdir dir="${project.build.directory}/tmp" />
-              </target>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
       <!-- Enable surefire and scalatest in all children, in one place: -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
       </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-      </plugin>
       <!-- Build test-jar's for all projects, since some projects depend on tests from others -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -784,20 +568,8 @@
         <property><name>scala-2.10</name></property>
       </activation>
       <properties>
-        <scala.version>2.10.6</scala.version>
         <scala.binary.version>2.10</scala.binary.version>
-        <jline.version>${scala.version}</jline.version>
-        <jline.groupid>org.scala-lang</jline.groupid>
       </properties>
-      <dependencyManagement>
-        <dependencies>
-          <dependency>
-            <groupId>${jline.groupid}</groupId>
-            <artifactId>jline</artifactId>
-            <version>${jline.version}</version>
-          </dependency>
-        </dependencies>
-      </dependencyManagement>
       <build>
         <plugins>
           <plugin>
@@ -831,7 +603,6 @@
         <property><name>!scala-2.10</name></property>
       </activation>
       <properties>
-        <scala.version>2.11.8</scala.version>
         <scala.binary.version>2.11</scala.binary.version>
       </properties>
       <build>


[4/5] bahir-flink git commit: [BAHIR-55] Add Redis connector from Flink

Posted by lr...@apache.org.
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..18cdc64
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase {
+
+    public static final int REDIS_PORT = getAvailablePort();
+    public static final String REDIS_HOST = "127.0.0.1";
+
+    private static RedisServer redisServer;
+
+    @BeforeClass
+    public static void createRedisServer() throws IOException, InterruptedException {
+        redisServer = new RedisServer(REDIS_PORT);
+        redisServer.start();
+    }
+
+    @AfterClass
+    public static void stopRedisServer(){
+        redisServer.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
new file mode 100644
index 0000000..d2637f7
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSentinelClusterTest extends TestLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClusterTest.class);
+
+    private static RedisCluster cluster;
+    private static final String REDIS_MASTER = "master";
+    private static final String TEST_KEY = "testKey";
+    private static final String TEST_VALUE = "testValue";
+
+    private JedisSentinelPool jedisSentinelPool;
+    private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+    @BeforeClass
+    public static void setUpCluster(){
+        cluster = RedisCluster.builder().ephemeralSentinels().quorumSize(1)
+            .replicationGroup(REDIS_MASTER, 1)
+            .build();
+        cluster.start();
+        LOG.info("Started redis cluster {}", cluster);
+    }
+
+    @Before
+    public void setUp() {
+        Set<String> hosts = JedisUtil.sentinelHosts(cluster);
+        jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+            .setSentinels(hosts).build();
+        jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+            jedisSentinelConfig.getSentinels());
+    }
+
+    @Test
+    public void testRedisSentinelOperation() {
+        RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+        Jedis jedis = null;
+        try{
+            jedis = jedisSentinelPool.getResource();
+            redisContainer.set(TEST_KEY, TEST_VALUE);
+            assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+        }finally {
+            if (jedis != null){
+                jedis.close();
+            }
+        }
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        if (jedisSentinelPool != null) {
+            jedisSentinelPool.close();
+        }
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws IOException {
+        if (cluster != null) {
+            cluster.stop();
+            LOG.info("Stopped redis cluster");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
new file mode 100644
index 0000000..e071894
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+    private FlinkJedisPoolConfig jedisPoolConfig;
+    private static final Long NUM_ELEMENTS = 20L;
+    private static final String REDIS_KEY = "TEST_KEY";
+    private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
+
+    StreamExecutionEnvironment env;
+
+
+    private Jedis jedis;
+
+    @Before
+    public void setUp(){
+        jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+            .setHost(REDIS_HOST)
+            .setPort(REDIS_PORT).build();
+        jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    @Test
+    public void testRedisListDataType() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+            new RedisCommandMapper(RedisCommand.LPUSH));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis List Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+        jedis.del(REDIS_KEY);
+    }
+
+    @Test
+    public void testRedisSetDataType() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+            new RedisCommandMapper(RedisCommand.SADD));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Set Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+        jedis.del(REDIS_KEY);
+    }
+
+    @Test
+    public void testRedisHyperLogLogDataType() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+            new RedisCommandMapper(RedisCommand.PFADD));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hyper Log Log Data Type");
+
+        assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+        jedis.del(REDIS_KEY);
+    }
+
+    @Test
+    public void testRedisSortedSetDataType() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+            new RedisAdditionalDataMapper(RedisCommand.ZADD));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Sorted Set Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @Test
+    public void testRedisHashDataType() throws Exception {
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
+            new RedisAdditionalDataMapper(RedisCommand.HSET));
+
+        source.addSink(redisSink);
+        env.execute("Test Redis Hash Data Type");
+
+        assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
+
+        jedis.del(REDIS_ADDITIONAL_KEY);
+    }
+
+    @After
+    public void tearDown(){
+        if(jedis != null){
+            jedis.close();
+        }
+    }
+
+    private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+            for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+            for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                ctx.collect(new Tuple2<>("" + i, "message #" + i));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+            for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                ctx.collect(new Tuple2<>( "message #" + i, "" + i));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        public RedisCommandMapper(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand);
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+
+    public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>> {
+
+        private RedisCommand redisCommand;
+
+        RedisAdditionalDataMapper(RedisCommand redisCommand){
+            this.redisCommand = redisCommand;
+        }
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
new file mode 100644
index 0000000..06ade83
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.JedisPool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import redis.clients.jedis.JedisPubSub;
+
+import static org.junit.Assert.assertEquals;
+
+public class RedisSinkPublishITCase extends RedisITCaseBase {
+
+    private static final int NUM_ELEMENTS = 20;
+    private static final String REDIS_CHANNEL = "CHANNEL";
+
+    private static final List<String> sourceList = new ArrayList<>();
+    private Thread sinkThread;
+    private PubSub pubSub;
+
+    @Before
+    public void before() throws Exception {
+        pubSub = new PubSub();
+        sinkThread = new Thread(new Subscribe(pubSub));
+    }
+
+    @Test
+    public void redisSinkTest() throws Exception {
+        sinkThread.start();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+            .setHost(REDIS_HOST)
+            .setPort(REDIS_PORT).build();
+        DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
+
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());
+
+        source.addSink(redisSink);
+
+        env.execute("Redis Sink Test");
+
+        assertEquals(NUM_ELEMENTS, sourceList.size());
+    }
+
+    @After
+    public void after() throws Exception {
+        pubSub.unsubscribe();
+        sinkThread.join();
+        sourceList.clear();
+    }
+
+    private class Subscribe implements Runnable {
+        private PubSub localPubSub;
+        private Subscribe(PubSub pubSub){
+            this.localPubSub = pubSub;
+        }
+
+        @Override
+        public void run() {
+            JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
+            pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
+        }
+    }
+
+    private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
+        private static final long serialVersionUID = 1L;
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
+            for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+                ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    public static class PubSub extends JedisPubSub {
+
+        @Override
+        public void onMessage(String channel, String message) {
+            sourceList.add(message);
+        }
+
+    }
+
+    private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
+
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return new RedisCommandDescription(RedisCommand.PUBLISH);
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
new file mode 100644
index 0000000..0ec4cd5
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class RedisSinkTest extends TestLogger {
+
+    @Test(expected=NullPointerException.class)
+    public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
+        new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
+        new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
+        new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
+    }
+
+    @Test
+    public void testRedisDownBehavior() throws Exception {
+
+        // create a wrong configuration so that open() fails.
+
+        FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+            .setHost("127.0.0.1")
+            .setPort(1234).build();
+
+        testDownBehavior(wrongJedisPoolConfig);
+    }
+
+    @Test
+    public void testRedisClusterDownBehavior() throws Exception {
+
+        Set<InetSocketAddress> hosts = new HashSet<>();
+        hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+        // create a wrong configuration so that open() fails.
+
+        FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+            .setNodes(hosts)
+            .setTimeout(100)
+            .setMaxIdle(1)
+            .setMaxTotal(1)
+            .setMinIdle(1).build();
+
+        testDownBehavior(wrongJedisClusterConfig);
+    }
+
+    @Test
+    public void testRedisSentinelDownBehavior() throws Exception {
+
+        Set<String> hosts = new HashSet<>();
+        hosts.add("localhost:55095");
+
+        // create a wrong configuration so that open() fails.
+
+        FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
+            .setMasterName("master")
+            .setSentinels(hosts)
+            .build();
+
+        testDownBehavior(wrongJedisSentinelConfig);
+    }
+
+    private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
+        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
+            new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+        try {
+            redisSink.open(new Configuration());
+        } catch (Exception e) {
+
+            // search for nested JedisConnectionExceptions
+            // because this is the expected behavior
+
+            Throwable t = e;
+            int depth = 0;
+            while (!(t instanceof JedisConnectionException)) {
+                t = t.getCause();
+                if (t == null || depth++ == 20) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private class TestMapper implements RedisMapper<Tuple2<String, String>> {
+        private RedisCommandDescription redisCommandDescription;
+
+        TestMapper(RedisCommandDescription redisCommandDescription){
+            this.redisCommandDescription = redisCommandDescription;
+        }
+        @Override
+        public RedisCommandDescription getCommandDescription() {
+            return redisCommandDescription;
+        }
+
+        @Override
+        public String getKeyFromData(Tuple2<String, String> data) {
+            return data.f0;
+        }
+
+        @Override
+        public String getValueFromData(Tuple2<String, String> data) {
+            return data.f1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
new file mode 100644
index 0000000..2601e40
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest extends TestLogger {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
+        new TestConfig(-1, 0, 0, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
+        new TestConfig(1, -1, 0, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
+        new TestConfig(0, 0, -1, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
+        new TestConfig(0, 0, 0, -1);
+    }
+
+    private class TestConfig extends FlinkJedisConfigBase {
+
+        protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
+            super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
new file mode 100644
index 0000000..6d0e787
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        builder.setMinIdle(0)
+            .setMaxIdle(0)
+            .setMaxTotal(0)
+            .setTimeout(0)
+            .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
+        Set<InetSocketAddress> set = new HashSet<>();
+        FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
+        builder.setMinIdle(0)
+            .setMaxIdle(0)
+            .setMaxTotal(0)
+            .setTimeout(0)
+            .setNodes(set)
+            .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
new file mode 100644
index 0000000..bacd660
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisPoolConfigTest extends TestLogger {
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointExceptionIfHostValueIsNull(){
+        FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
+        builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
new file mode 100644
index 0000000..c6a218a
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest extends TestLogger {
+
+    public static final String MASTER_NAME = "test-master";
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
+        FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+        Set<String> sentinels = new HashSet<>();
+        sentinels.add("127.0.0.1");
+        builder.setSentinels(sentinels).build();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
+        FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+        builder.setMasterName(MASTER_NAME).build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
+        FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
+        Set<String> sentinels = new HashSet<>();
+        builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
new file mode 100644
index 0000000..4af0c14
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest extends TestLogger {
+
+    @Test(expected=IllegalArgumentException.class)
+    public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
+        RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
+        redisCommandMapper.getCommandDescription();
+    }
+
+    @Test
+    public void shouldReturnNullForAdditionalDataType(){
+        RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
+        RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
+        assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
+        assertNull(redisDataTypeDescription.getAdditionalKey());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/test/resources/log4j.properties b/flink-connector-redis/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c82c2c7
--- /dev/null
+++ b/flink-connector-redis/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f435d2f..b26982f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -262,11 +262,6 @@
             <argLine>-Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
             <systemProperties>
               <log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
-              <derby.system.durability>test</derby.system.durability>
-              <java.awt.headless>true</java.awt.headless>
-              <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
-              <!-- Needed by sql/hive tests. -->
-              <test.src.tables>src</test.src.tables>
             </systemProperties>
             <failIfNoTests>false</failIfNoTests>
           </configuration>
@@ -455,10 +450,6 @@
         <version>0.11</version>
       </plugin>
       <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-source-plugin</artifactId>
       </plugin>
@@ -471,8 +462,6 @@
           <failOnViolation>true</failOnViolation>
           <includeTestSourceDirectory>true</includeTestSourceDirectory>
           <failsOnError>false</failsOnError>
-          <sourceDirectories>${basedir}/src/main/java,${basedir}/src/main/scala</sourceDirectories>
-          <testSourceDirectories>${basedir}/src/test/java,${basedir}/examples/src/main/java</testSourceDirectories>
           <configLocation>dev/checkstyle.xml</configLocation>
           <outputFile>${basedir}/target/checkstyle-output.xml</outputFile>
           <encoding>${project.build.sourceEncoding}</encoding>


[5/5] bahir-flink git commit: [BAHIR-55] Add Redis connector from Flink

Posted by lr...@apache.org.
[BAHIR-55] Add Redis connector from Flink

Closes #1


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/b2955a74
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/b2955a74
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/b2955a74

Branch: refs/heads/master
Commit: b2955a749e39cab55612917e4d5e702781f1e87c
Parents: 9966a0c
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 19 10:55:30 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Aug 19 19:09:27 2016 -0700

----------------------------------------------------------------------
 dev/checkstyle.xml                              |  11 +-
 flink-connector-redis/pom.xml                   |  78 ++++++
 .../streaming/connectors/redis/RedisSink.java   | 188 ++++++++++++++
 .../streaming/connectors/redis/common/Util.java |  25 ++
 .../common/config/FlinkJedisClusterConfig.java  | 188 ++++++++++++++
 .../common/config/FlinkJedisConfigBase.java     |  91 +++++++
 .../common/config/FlinkJedisPoolConfig.java     | 225 ++++++++++++++++
 .../common/config/FlinkJedisSentinelConfig.java | 260 +++++++++++++++++++
 .../common/container/RedisClusterContainer.java | 171 ++++++++++++
 .../container/RedisCommandsContainer.java       | 115 ++++++++
 .../RedisCommandsContainerBuilder.java          | 117 +++++++++
 .../redis/common/container/RedisContainer.java  | 252 ++++++++++++++++++
 .../redis/common/mapper/RedisCommand.java       |  86 ++++++
 .../common/mapper/RedisCommandDescription.java  |  93 +++++++
 .../redis/common/mapper/RedisDataType.java      |  66 +++++
 .../redis/common/mapper/RedisMapper.java        |  66 +++++
 .../connectors/redis/RedisITCaseBase.java       |  45 ++++
 .../redis/RedisSentinelClusterTest.java         |  99 +++++++
 .../connectors/redis/RedisSinkITCase.java       | 233 +++++++++++++++++
 .../redis/RedisSinkPublishITCase.java           | 137 ++++++++++
 .../connectors/redis/RedisSinkTest.java         | 143 ++++++++++
 .../common/config/FlinkJedisConfigBaseTest.java |  50 ++++
 .../common/config/JedisClusterConfigTest.java   |  49 ++++
 .../common/config/JedisPoolConfigTest.java      |  29 +++
 .../common/config/JedisSentinelConfigTest.java  |  49 ++++
 .../mapper/RedisDataTypeDescriptionTest.java    |  41 +++
 .../src/test/resources/log4j.properties         |  27 ++
 pom.xml                                         |  11 -
 28 files changed, 2928 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/dev/checkstyle.xml
----------------------------------------------------------------------
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index 3de6aa9..7a0558c 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -41,7 +41,7 @@
 
  -->
 
-<module name = "Checker">
+<module name="Checker">
     <property name="charset" value="UTF-8"/>
 
     <property name="severity" value="error"/>
@@ -78,10 +78,6 @@
             <property name="allowByTailComment" value="true"/>
             <property name="allowNonPrintableEscapes" value="true"/>
         </module>
-        <module name="LineLength">
-            <property name="max" value="100"/>
-            <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
-        </module>
         <module name="NoLineWrap"/>
         <module name="EmptyBlock">
             <property name="option" value="TEXT"/>
@@ -165,7 +161,10 @@
             <property name="exceptionVariableName" value="expected"/>
         </module>
         <module name="CommentsIndentation"/>
-        <module name="UnusedImports"/>
+        <module name="UnusedImports">
+          <!-- Allow imports for JavaDocs -->
+          <property name="processJavadoc" value="true"/>
+        </module>
         <module name="RedundantImport"/>
         <module name="RedundantModifier"/>
     </module>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-redis/pom.xml b/flink-connector-redis/pom.xml
new file mode 100644
index 0000000..c34711e
--- /dev/null
+++ b/flink-connector-redis/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink_parent_2.11</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-redis_2.11</artifactId>
+    <name>flink-connector-redis</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <jedis.version>2.8.0</jedis.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>${jedis.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.kstyrc</groupId>
+            <artifactId>embedded-redis</artifactId>
+            <version>0.6</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
new file mode 100644
index 0000000..688f94a
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
+ * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
+ * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
+ * a Redis Cluster.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *{@code
+ *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
+ *
+ *    private RedisCommand redisCommand;
+ *
+ *    public RedisExampleMapper(RedisCommand redisCommand){
+ *        this.redisCommand = redisCommand;
+ *    }
+ *    public RedisCommandDescription getCommandDescription() {
+ *        return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
+ *    }
+ *    public String getKeyFromData(Tuple2<String, String> data) {
+ *        return data.f0;
+ *    }
+ *    public String getValueFromData(Tuple2<String, String> data) {
+ *        return data.f1;
+ *    }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *    .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
+ *}</pre>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class RedisSink<IN> extends RichSinkFunction<IN> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
+
+    /**
+     * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+     * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+     * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+     * {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
+     * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+     * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
+     */
+    private String additionalKey;
+    private RedisMapper<IN> redisSinkMapper;
+    private RedisCommand redisCommand;
+
+    private FlinkJedisConfigBase flinkJedisConfigBase;
+    private RedisCommandsContainer redisCommandsContainer;
+
+    /**
+     * Creates a new {@link RedisSink} that connects to the Redis server.
+     *
+     * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
+     * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
+     */
+    public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
+        Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
+        Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
+        Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
+
+        this.flinkJedisConfigBase = flinkJedisConfigBase;
+
+        this.redisSinkMapper = redisSinkMapper;
+        RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
+        this.redisCommand = redisCommandDescription.getCommand();
+        this.additionalKey = redisCommandDescription.getAdditionalKey();
+    }
+
+    /**
+     * Called when new data arrives to the sink, and forwards it to Redis channel.
+     * Depending on the specified Redis data type (see {@link RedisDataType}),
+     * a different Redis command will be applied.
+     * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
+     *
+     * @param input The incoming data
+     */
+    @Override
+    public void invoke(IN input) throws Exception {
+        String key = redisSinkMapper.getKeyFromData(input);
+        String value = redisSinkMapper.getValueFromData(input);
+
+        switch (redisCommand) {
+            case RPUSH:
+                this.redisCommandsContainer.rpush(key, value);
+                break;
+            case LPUSH:
+                this.redisCommandsContainer.lpush(key, value);
+                break;
+            case SADD:
+                this.redisCommandsContainer.sadd(key, value);
+                break;
+            case SET:
+                this.redisCommandsContainer.set(key, value);
+                break;
+            case PFADD:
+                this.redisCommandsContainer.pfadd(key, value);
+                break;
+            case PUBLISH:
+                this.redisCommandsContainer.publish(key, value);
+                break;
+            case ZADD:
+                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+                break;
+            case HSET:
+                this.redisCommandsContainer.hset(this.additionalKey, key, value);
+                break;
+            default:
+                throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
+        }
+    }
+
+    /**
+     * Initializes the connection to Redis by either cluster or sentinels or single server.
+     *
+     * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+     */
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        try {
+            this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+            this.redisCommandsContainer.open();
+        } catch (Exception e) {
+            LOG.error("Redis has not been properly initialized: ", e);
+            throw e;
+        }
+    }
+
+    /**
+     * Closes commands container.
+     * @throws IOException if command container is unable to close.
+     */
+    @Override
+    public void close() throws IOException {
+        if (redisCommandsContainer != null) {
+            redisCommandsContainer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
new file mode 100644
index 0000000..b0e38b9
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/Util.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common;
+
+public class Util {
+    public static void checkArgument(boolean condition, String message) {
+        if(!condition) {
+            throw new IllegalArgumentException(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
new file mode 100644
index 0000000..119ade3
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+    private static final long serialVersionUID = 1L;
+
+    private final Set<InetSocketAddress> nodes;
+    private final int maxRedirections;
+
+
+    /**
+     * Jedis cluster configuration.
+     * The list of node is mandatory, and when nodes is not set, it throws NullPointerException.
+     *
+     * @param nodes list of node information for JedisCluster
+     * @param connectionTimeout socket / connection timeout. The default is 2000
+     * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
+     * @param maxTotal the maximum number of objects that can be allocated by the pool
+     * @param maxIdle the cap on the number of "idle" instances in the pool
+     * @param minIdle the minimum number of idle objects to maintain in the pool
+     * @throws NullPointerException if parameter {@code nodes} is {@code null}
+     */
+    private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
+                                    int maxTotal, int maxIdle, int minIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+        Objects.requireNonNull(nodes, "Node information should be presented");
+        Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
+        this.nodes = new HashSet<>(nodes);
+        this.maxRedirections = maxRedirections;
+    }
+
+
+
+    /**
+     * Returns nodes.
+     *
+     * @return list of node information
+     */
+    public Set<HostAndPort> getNodes() {
+        Set<HostAndPort> ret = new HashSet<>();
+        for (InetSocketAddress node : nodes) {
+            ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+        }
+        return ret;
+    }
+
+    /**
+     * Returns limit of redirection.
+     *
+     * @return limit of redirection
+     */
+    public int getMaxRedirections() {
+        return maxRedirections;
+    }
+
+
+    /**
+     * Builder for initializing  {@link FlinkJedisClusterConfig}.
+     */
+    public static class Builder {
+        private Set<InetSocketAddress> nodes;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int maxRedirections = 5;
+        private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+        private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+        private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+        /**
+         * Sets list of node.
+         *
+         * @param nodes list of node
+         * @return Builder itself
+         */
+        public Builder setNodes(Set<InetSocketAddress> nodes) {
+            this.nodes = nodes;
+            return this;
+        }
+
+        /**
+         * Sets socket / connection timeout.
+         *
+         * @param timeout socket / connection timeout, default value is 2000
+         * @return Builder itself
+         */
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        /**
+         * Sets limit of redirection.
+         *
+         * @param maxRedirections limit of redirection, default value is 5
+         * @return Builder itself
+         */
+        public Builder setMaxRedirections(int maxRedirections) {
+            this.maxRedirections = maxRedirections;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code maxTotal} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxTotal(int maxTotal) {
+            this.maxTotal = maxTotal;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code maxIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxIdle(int maxIdle) {
+            this.maxIdle = maxIdle;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code minIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+         * @return Builder itself
+         */
+        public Builder setMinIdle(int minIdle) {
+            this.minIdle = minIdle;
+            return this;
+        }
+
+        /**
+         * Builds JedisClusterConfig.
+         *
+         * @return JedisClusterConfig
+         */
+        public FlinkJedisClusterConfig build() {
+            return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JedisClusterConfig{" +
+            "nodes=" + nodes +
+            ", timeout=" + connectionTimeout +
+            ", maxRedirections=" + maxRedirections +
+            ", maxTotal=" + maxTotal +
+            ", maxIdle=" + maxIdle +
+            ", minIdle=" + minIdle +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
new file mode 100644
index 0000000..0d821ed
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    protected final int maxTotal;
+    protected final int maxIdle;
+    protected final int minIdle;
+    protected final int connectionTimeout;
+
+    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
+        Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
+        Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
+        Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
+        Util.checkArgument(minIdle >= 0, "minIdle value can not be negative");
+
+        this.connectionTimeout = connectionTimeout;
+        this.maxTotal = maxTotal;
+        this.maxIdle = maxIdle;
+        this.minIdle = minIdle;
+    }
+
+    /**
+     * Returns timeout.
+     *
+     * @return connection timeout
+     */
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Get the value for the {@code maxTotal} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code maxTotal} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getMaxTotal()
+     */
+    public int getMaxTotal() {
+        return maxTotal;
+    }
+
+    /**
+     * Get the value for the {@code maxIdle} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code maxIdle} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getMaxIdle()
+     */
+    public int getMaxIdle() {
+        return maxIdle;
+    }
+
+    /**
+     * Get the value for the {@code minIdle} configuration attribute
+     * for pools to be created with this configuration instance.
+     *
+     * @return  The current setting of {@code minIdle} for this
+     *          configuration instance
+     * @see GenericObjectPoolConfig#getMinIdle()
+     */
+    public int getMinIdle() {
+        return minIdle;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
new file mode 100644
index 0000000..d4c30ff
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.util.Objects;
+
+/**
+ * Configuration for Jedis pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String host;
+    private final int port;
+    private final int database;
+    private final String password;
+
+
+    /**
+     * Jedis pool configuration.
+     * The host is mandatory, and when host is not set, it throws NullPointerException.
+     *
+     * @param host hostname or IP
+     * @param port port, default value is 6379
+     * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
+     * @param password password, if any
+     * @param database database index
+     * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+     * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+     * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+     * @throws NullPointerException if parameter {@code host} is {@code null}
+     */
+    private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
+                                int maxTotal, int maxIdle, int minIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        Objects.requireNonNull(host, "Host information should be presented");
+        this.host = host;
+        this.port = port;
+        this.database = database;
+        this.password = password;
+    }
+
+    /**
+     * Returns host.
+     *
+     * @return hostname or IP
+     */
+    public String getHost() {
+        return host;
+    }
+
+    /**
+     * Returns port.
+     *
+     * @return port
+     */
+    public int getPort() {
+        return port;
+    }
+
+
+    /**
+     * Returns database index.
+     *
+     * @return database index
+     */
+    public int getDatabase() {
+        return database;
+    }
+
+    /**
+     * Returns password.
+     *
+     * @return password
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Builder for initializing  {@link FlinkJedisPoolConfig}.
+     */
+    public static class Builder {
+        private String host;
+        private int port = Protocol.DEFAULT_PORT;
+        private int timeout = Protocol.DEFAULT_TIMEOUT;
+        private int database = Protocol.DEFAULT_DATABASE;
+        private String password;
+        private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+        private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+        private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+        /**
+         * Sets value for the {@code maxTotal} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxTotal(int maxTotal) {
+            this.maxTotal = maxTotal;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code maxIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxIdle(int maxIdle) {
+            this.maxIdle = maxIdle;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code minIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+         * @return Builder itself
+         */
+        public Builder setMinIdle(int minIdle) {
+            this.minIdle = minIdle;
+            return this;
+        }
+
+        /**
+         * Sets host.
+         *
+         * @param host host
+         * @return Builder itself
+         */
+        public Builder setHost(String host) {
+            this.host = host;
+            return this;
+        }
+
+        /**
+         * Sets port.
+         *
+         * @param port port, default value is 6379
+         * @return Builder itself
+         */
+        public Builder setPort(int port) {
+            this.port = port;
+            return this;
+        }
+
+        /**
+         * Sets timeout.
+         *
+         * @param timeout timeout, default value is 2000
+         * @return Builder itself
+         */
+        public Builder setTimeout(int timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        /**
+         * Sets database index.
+         *
+         * @param database database index, default value is 0
+         * @return Builder itself
+         */
+        public Builder setDatabase(int database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * Sets password.
+         *
+         * @param password password, if any
+         * @return Builder itself
+         */
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+
+        /**
+         * Builds JedisPoolConfig.
+         *
+         * @return JedisPoolConfig
+         */
+        public FlinkJedisPoolConfig build() {
+            return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JedisPoolConfig{" +
+            "host='" + host + '\'' +
+            ", port=" + port +
+            ", timeout=" + connectionTimeout +
+            ", database=" + database +
+            ", maxTotal=" + maxTotal +
+            ", maxIdle=" + maxIdle +
+            ", minIdle=" + minIdle +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
new file mode 100644
index 0000000..6058a53
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+    private final String masterName;
+    private final Set<String> sentinels;
+    private final int soTimeout;
+    private final String password;
+    private final int database;
+
+    /**
+     * Jedis Sentinels config.
+     * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException.
+     *
+     * @param masterName master name of the replica set
+     * @param sentinels set of sentinel hosts
+     * @param connectionTimeout timeout connection timeout
+     * @param soTimeout timeout socket timeout
+     * @param password password, if any
+     * @param database database database index
+     * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
+     * @param maxIdle the cap on the number of "idle" instances in the pool
+     * @param minIdle the minimum number of idle objects to maintain in the pool
+     *
+     * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
+     * @throws IllegalArgumentException if {@code sentinels} are empty
+     */
+    private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
+                                    int connectionTimeout, int soTimeout,
+                                    String password, int database,
+                                    int maxTotal, int maxIdle, int minIdle) {
+        super(connectionTimeout, maxTotal, maxIdle, minIdle);
+        Objects.requireNonNull(masterName, "Master name should be presented");
+        Objects.requireNonNull(sentinels, "Sentinels information should be presented");
+        Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
+
+        this.masterName = masterName;
+        this.sentinels = new HashSet<>(sentinels);
+        this.soTimeout = soTimeout;
+        this.password = password;
+        this.database = database;
+    }
+
+    /**
+     * Returns master name of the replica set.
+     *
+     * @return master name of the replica set.
+     */
+    public String getMasterName() {
+        return masterName;
+    }
+
+    /**
+     * Returns Sentinels host addresses.
+     *
+     * @return Set of Sentinels host addresses
+     */
+    public Set<String> getSentinels() {
+        return sentinels;
+    }
+
+    /**
+     * Returns socket timeout.
+     *
+     * @return socket timeout
+     */
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    /**
+     * Returns password.
+     *
+     * @return password
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Returns database index.
+     *
+     * @return database index
+     */
+    public int getDatabase() {
+        return database;
+    }
+
+    /**
+     * Builder for initializing {@link FlinkJedisSentinelConfig}.
+     */
+    public static class Builder {
+        private String masterName;
+        private Set<String> sentinels;
+        private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+        private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+        private String password;
+        private int database = Protocol.DEFAULT_DATABASE;
+        private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+        private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+        private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+        /**
+         * Sets master name of the replica set.
+         *
+         * @param masterName  master name of the replica set
+         * @return Builder itself
+         */
+        public Builder setMasterName(String masterName) {
+            this.masterName = masterName;
+            return this;
+        }
+
+        /**
+         * Sets sentinels address.
+         *
+         * @param sentinels host set of the sentinels
+         * @return Builder itself
+         */
+        public Builder setSentinels(Set<String> sentinels) {
+            this.sentinels = sentinels;
+            return this;
+        }
+
+        /**
+         * Sets connection timeout.
+         *
+         * @param connectionTimeout connection timeout, default value is 2000
+         * @return Builder itself
+         */
+        public Builder setConnectionTimeout(int connectionTimeout) {
+            this.connectionTimeout = connectionTimeout;
+            return this;
+        }
+
+        /**
+         * Sets socket timeout.
+         *
+         * @param soTimeout socket timeout, default value is 2000
+         * @return Builder itself
+         */
+        public Builder setSoTimeout(int soTimeout) {
+            this.soTimeout = soTimeout;
+            return this;
+        }
+
+        /**
+         * Sets password.
+         *
+         * @param password password, if any
+         * @return Builder itself
+         */
+        public Builder setPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
+         * Sets database index.
+         *
+         * @param database database index, default value is 0
+         * @return Builder itself
+         */
+        public Builder setDatabase(int database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code maxTotal} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxTotal(int maxTotal) {
+            this.maxTotal = maxTotal;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code maxIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+         * @return Builder itself
+         */
+        public Builder setMaxIdle(int maxIdle) {
+            this.maxIdle = maxIdle;
+            return this;
+        }
+
+        /**
+         * Sets value for the {@code minIdle} configuration attribute
+         * for pools to be created with this configuration instance.
+         *
+         * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+         * @return Builder itself
+         */
+        public Builder setMinIdle(int minIdle) {
+            this.minIdle = minIdle;
+            return this;
+        }
+
+        /**
+         * Builds JedisSentinelConfig.
+         *
+         * @return JedisSentinelConfig
+         */
+        public FlinkJedisSentinelConfig build(){
+            return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
+                password, database, maxTotal, maxIdle, minIdle);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JedisSentinelConfig{" +
+            "masterName='" + masterName + '\'' +
+            ", connectionTimeout=" + connectionTimeout +
+            ", soTimeout=" + soTimeout +
+            ", database=" + database +
+            ", maxTotal=" + maxTotal +
+            ", maxIdle=" + maxIdle +
+            ", minIdle=" + minIdle +
+            '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
new file mode 100644
index 0000000..cc1d626
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
+
+    private transient JedisCluster jedisCluster;
+
+    /**
+     * Initialize Redis command container for Redis cluster.
+     *
+     * @param jedisCluster JedisCluster instance
+     */
+    public RedisClusterContainer(JedisCluster jedisCluster) {
+        Objects.requireNonNull(jedisCluster, "Jedis cluster can not be null");
+
+        this.jedisCluster = jedisCluster;
+    }
+
+    @Override
+    public void open() throws Exception {
+
+        // echo() tries to open a connection and echos back the
+        // message passed as argument. Here we use it to monitor
+        // if we can communicate with the cluster.
+
+        jedisCluster.echo("Test");
+    }
+
+    @Override
+    public void hset(final String key, final String hashField, final String value) {
+        try {
+            jedisCluster.hset(key, hashField, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HSET to hash {} error message {}",
+                    key, hashField, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void rpush(final String listName, final String value) {
+        try {
+            jedisCluster.rpush(listName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}",
+                    listName, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void lpush(String listName, String value) {
+        try {
+            jedisCluster.lpush(listName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}",
+                    listName, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void sadd(final String setName, final String value) {
+        try {
+            jedisCluster.sadd(setName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+                    setName, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void publish(final String channelName, final String message) {
+        try {
+            jedisCluster.publish(channelName, message);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+                    channelName, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void set(final String key, final String value) {
+        try {
+            jedisCluster.set(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void pfadd(final String key, final String element) {
+        try {
+            jedisCluster.set(key, element);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void zadd(final String key, final String score, final String element) {
+        try {
+            jedisCluster.zadd(key, Double.valueOf(score), element);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * Closes the {@link JedisCluster}.
+     */
+    @Override
+    public void close() throws IOException {
+        this.jedisCluster.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
new file mode 100644
index 0000000..78771f1
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+    /**
+     * Open the Jedis container.
+     *
+     * @throws Exception if the instance can not be opened properly
+     */
+    void open() throws Exception;
+
+    /**
+     * Sets field in the hash stored at key to value.
+     * If key does not exist, a new key holding a hash is created.
+     * If field already exists in the hash, it is overwritten.
+     *
+     * @param key Hash name
+     * @param hashField Hash field
+     * @param value Hash value
+     */
+    void hset(String key, String hashField, String value);
+
+    /**
+     * Insert the specified value at the tail of the list stored at key.
+     * If key does not exist, it is created as empty list before performing the push operation.
+     *
+     * @param listName Name of the List
+     * @param value  Value to be added
+     */
+    void rpush(String listName, String value);
+
+    /**
+     * Insert the specified value at the head of the list stored at key.
+     * If key does not exist, it is created as empty list before performing the push operation.
+     *
+     * @param listName Name of the List
+     * @param value  Value to be added
+     */
+    void lpush(String listName, String value);
+
+    /**
+     * Add the specified member to the set stored at key.
+     * Specified members that are already a member of this set are ignored.
+     * If key does not exist, a new set is created before adding the specified members.
+     *
+     * @param setName Name of the Set
+     * @param value Value to be added
+     */
+    void sadd(String setName, String value);
+
+    /**
+     * Posts a message to the given channel.
+     *
+     * @param channelName Name of the channel to which data will be published
+     * @param message the message
+     */
+    void publish(String channelName, String message);
+
+    /**
+     * Set key to hold the string value. If key already holds a value, it is overwritten,
+     * regardless of its type. Any previous time to live associated with the key is
+     * discarded on successful SET operation.
+     *
+     * @param key the key name in which value to be set
+     * @param value the value
+     */
+    void set(String key, String value);
+
+    /**
+     * Adds all the element arguments to the HyperLogLog data structure
+     * stored at the variable name specified as first argument.
+     *
+     * @param key The name of the key
+     * @param element the element
+     */
+    void pfadd(String key, String element);
+
+    /**
+     * Adds the specified member with the specified scores to the sorted set stored at key.
+     *
+     * @param key The name of the Sorted Set
+     * @param score Score of the element
+     * @param element  element to be added
+     */
+    void zadd(String key, String score, String element);
+
+    /**
+     * Close the Jedis container.
+     *
+     * @throws IOException if the instance can not be closed properly
+     */
+    void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..0db5b05
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Objects;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+    /**
+     * Initialize the {@link RedisCommandsContainer} based on the instance type.
+     * @param flinkJedisConfigBase configuration base
+     * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
+     */
+    public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
+        if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+            FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
+            return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+        } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
+            FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
+            return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+        } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
+            FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
+            return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+        } else {
+            throw new IllegalArgumentException("Jedis configuration not found");
+        }
+    }
+
+    /**
+     * Builds container for single Redis environment.
+     *
+     * @param jedisPoolConfig configuration for JedisPool
+     * @return container for single Redis environment
+     * @throws NullPointerException if jedisPoolConfig is null
+     */
+    public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
+        Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+        genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+        genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+        genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
+            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+            jedisPoolConfig.getDatabase());
+        return new RedisContainer(jedisPool);
+    }
+
+    /**
+     * Builds container for Redis Cluster environment.
+     *
+     * @param jedisClusterConfig configuration for JedisCluster
+     * @return container for Redis Cluster environment
+     * @throws NullPointerException if jedisClusterConfig is null
+     */
+    public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
+        Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+        genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+        genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+        genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+        JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
+            jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
+        return new RedisClusterContainer(jedisCluster);
+    }
+
+    /**
+     * Builds container for Redis Sentinel environment.
+     *
+     * @param jedisSentinelConfig configuration for JedisSentinel
+     * @return container for Redis sentinel environment
+     * @throws NullPointerException if jedisSentinelConfig is null
+     */
+    public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
+        Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
+
+        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+        genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+        genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+        genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+        JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+            jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+            jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+            jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+        return new RedisContainer(jedisSentinelPool);
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..fb73a27
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
+
+    private transient JedisPool jedisPool;
+    private transient JedisSentinelPool jedisSentinelPool;
+
+    /**
+     * Use this constructor if to connect with single Redis server.
+     *
+     * @param jedisPool JedisPool which actually manages Jedis instances
+     */
+    public RedisContainer(JedisPool jedisPool) {
+        Objects.requireNonNull(jedisPool, "Jedis Pool can not be null");
+        this.jedisPool = jedisPool;
+        this.jedisSentinelPool = null;
+    }
+
+    /**
+     * Use this constructor if Redis environment is clustered with sentinels.
+     *
+     * @param sentinelPool SentinelPool which actually manages Jedis instances
+     */
+    public RedisContainer(final JedisSentinelPool sentinelPool) {
+        Objects.requireNonNull(sentinelPool, "Jedis Sentinel Pool can not be null");
+        this.jedisPool = null;
+        this.jedisSentinelPool = sentinelPool;
+    }
+
+    /**
+     * Closes the Jedis instances.
+     */
+    @Override
+    public void close() throws IOException {
+        if (this.jedisPool != null) {
+            this.jedisPool.close();
+        }
+        if (this.jedisSentinelPool != null) {
+            this.jedisSentinelPool.close();
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+
+        // echo() tries to open a connection and echos back the
+        // message passed as argument. Here we use it to monitor
+        // if we can communicate with the cluster.
+
+        getInstance().echo("Test");
+    }
+
+    @Override
+    public void hset(final String key, final String hashField, final String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.hset(key, hashField, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
+                    key, hashField, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void rpush(final String listName, final String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.rpush(listName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
+                    listName, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void lpush(String listName, String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.lpush(listName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
+                    listName, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void sadd(final String setName, final String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.sadd(setName, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
+                    setName, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void publish(final String channelName, final String message) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.publish(channelName, message);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
+                    channelName, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void set(final String key, final String value) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.set(key, value);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command SET to key {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void pfadd(final String key, final String element) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.pfadd(key, element);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    @Override
+    public void zadd(final String key, final String score, final String element) {
+        Jedis jedis = null;
+        try {
+            jedis = getInstance();
+            jedis.zadd(key, Double.valueOf(score), element);
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) {
+                LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
+                    key, e.getMessage());
+            }
+            throw e;
+        } finally {
+            releaseInstance(jedis);
+        }
+    }
+
+    /**
+     * Returns Jedis instance from the pool.
+     *
+     * @return the Jedis instance
+     */
+    private Jedis getInstance() {
+        if (jedisSentinelPool != null) {
+            return jedisSentinelPool.getResource();
+        } else {
+            return jedisPool.getResource();
+        }
+    }
+
+    /**
+     * Closes the jedis instance after finishing the command.
+     *
+     * @param jedis The jedis instance
+     */
+    private void releaseInstance(final Jedis jedis) {
+        if (jedis == null) {
+            return;
+        }
+        try {
+            jedis.close();
+        } catch (Exception e) {
+            LOG.error("Failed to close (return) instance to pool", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..cf9842c
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
+ */
+public enum RedisCommand {
+
+    /**
+     * Insert the specified value at the head of the list stored at key.
+     * If key does not exist, it is created as empty list before performing the push operations.
+     */
+    LPUSH(RedisDataType.LIST),
+
+    /**
+     * Insert the specified value at the tail of the list stored at key.
+     * If key does not exist, it is created as empty list before performing the push operation.
+     */
+    RPUSH(RedisDataType.LIST),
+
+    /**
+     * Add the specified member to the set stored at key.
+     * Specified member that is already a member of this set is ignored.
+     */
+    SADD(RedisDataType.SET),
+
+    /**
+     * Set key to hold the string value. If key already holds a value,
+     * it is overwritten, regardless of its type.
+     */
+    SET(RedisDataType.STRING),
+
+    /**
+     * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
+     */
+    PFADD(RedisDataType.HYPER_LOG_LOG),
+
+    /**
+     * Posts a message to the given channel.
+     */
+    PUBLISH(RedisDataType.PUBSUB),
+
+    /**
+     * Adds the specified members with the specified score to the sorted set stored at key.
+     */
+    ZADD(RedisDataType.SORTED_SET),
+
+    /**
+     * Sets field in the hash stored at key to value. If key does not exist,
+     * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
+     */
+    HSET(RedisDataType.HASH);
+
+    /**
+     * The {@link RedisDataType} this command belongs to.
+     */
+    private RedisDataType redisDataType;
+
+    RedisCommand(RedisDataType redisDataType) {
+        this.redisDataType = redisDataType;
+    }
+
+
+    /**
+     * The {@link RedisDataType} this command belongs to.
+     * @return the {@link RedisDataType}
+     */
+    public RedisDataType getRedisDataType(){
+        return redisDataType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..6ab329f
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The description of the command type. This must be passed while creating new {@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private RedisCommand redisCommand;
+
+    /**
+     * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+     * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
+     * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
+     * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
+     * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
+     * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
+     * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
+     */
+    private String additionalKey;
+
+    /**
+     * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+     * If different data type is specified, {@code additionalKey} is ignored.
+     * @param redisCommand the redis command type {@link RedisCommand}
+     * @param additionalKey additional key for Hash and Sorted set data type
+     */
+    public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
+        Objects.requireNonNull(redisCommand, "Redis command type can not be null");
+        this.redisCommand = redisCommand;
+        this.additionalKey = additionalKey;
+
+        if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+            redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
+            if (additionalKey == null) {
+                throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
+            }
+        }
+    }
+
+    /**
+     * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+     *
+     * @param redisCommand the redis data type {@link RedisCommand}
+     */
+    public RedisCommandDescription(RedisCommand redisCommand) {
+        this(redisCommand, null);
+    }
+
+    /**
+     * Returns the {@link RedisCommand}.
+     *
+     * @return the command type of the mapping
+     */
+    public RedisCommand getCommand() {
+        return redisCommand;
+    }
+
+    /**
+     * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+     *
+     * @return the additional key
+     */
+    public String getAdditionalKey() {
+        return additionalKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..989221c
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+    /**
+     * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
+     * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
+     * A String value can be at max 512 Megabytes in length.
+     */
+    STRING,
+
+    /**
+     * Redis Hashes are maps between string fields and string values.
+     */
+    HASH,
+
+    /**
+     * Redis Lists are simply lists of strings, sorted by insertion order.
+     */
+    LIST,
+
+    /**
+     * Redis Sets are an unordered collection of Strings.
+     */
+    SET,
+
+    /**
+     * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
+     * The difference is that every member of a Sorted Set is associated with score,
+     * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
+     * While members are unique, scores may be repeated.
+     */
+    SORTED_SET,
+
+    /**
+     * HyperLogLog is a probabilistic data structure used in order to count unique things.
+     */
+    HYPER_LOG_LOG,
+
+    /**
+     * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
+     * without knowledge of what (if any) subscribers there may be.
+     * Subscribers express interest in one or more channels, and only receive messages
+     * that are of interest, without knowledge of what (if any) publishers there are.
+     */
+    PUBSUB
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/b2955a74/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..b2580a7
--- /dev/null
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
+ *    public RedisDataTypeDescription getCommandDescription() {
+ *        return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *    }
+ *    public String getKeyFromData(Tuple2<String, String> data) {
+ *        return data.f0;
+ *    }
+ *    public String getValueFromData(Tuple2<String, String> data) {
+ *        return data.f1;
+ *    }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+    /**
+     * Returns descriptor which defines data type.
+     *
+     * @return data type descriptor
+     */
+    RedisCommandDescription getCommandDescription();
+
+    /**
+     * Extracts key from data.
+     *
+     * @param data source data
+     * @return key
+     */
+    String getKeyFromData(T data);
+
+    /**
+     * Extracts value from data.
+     *
+     * @param data source data
+     * @return value
+     */
+    String getValueFromData(T data);
+}


[3/5] bahir-flink git commit: [BAHIR-54] Drop enforced maven version

Posted by lr...@apache.org.
[BAHIR-54] Drop enforced maven version


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/9966a0ca
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/9966a0ca
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/9966a0ca

Branch: refs/heads/master
Commit: 9966a0ca7d28ed8e683dc7caa722c1cb4fbae56e
Parents: 2d28f10
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Aug 19 10:58:19 2016 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Fri Aug 19 19:09:02 2016 -0700

----------------------------------------------------------------------
 .travis.yml | 2 +-
 pom.xml     | 8 --------
 2 files changed, 1 insertion(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/9966a0ca/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 0d123c4..4c3ba91 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -26,6 +26,6 @@ jdk:
   - oraclejdk7
   - openjdk7
 
-
+install: true
 
 script: mvn clean verify -Dflink.version=$FLINK_VERSION
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/9966a0ca/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8ed5789..f435d2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,10 +47,6 @@
     <url>https://issues.apache.org/jira/browse/BAHIR</url>
   </issueManagement>
 
-  <prerequisites>
-    <maven>${maven.version}</maven>
-  </prerequisites>
-
   <mailingLists>
     <mailingList>
       <name>Dev Mailing List</name>
@@ -86,7 +82,6 @@
     <java.version>1.7</java.version>
     <scala.binary.version>2.11</scala.binary.version>
 
-    <maven.version>3.3.9</maven.version>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
 
@@ -178,9 +173,6 @@
               </goals>
               <configuration>
                 <rules>
-                  <requireMavenVersion>
-                    <version>${maven.version}</version>
-                  </requireMavenVersion>
                   <requireJavaVersion>
                     <version>${java.version}</version>
                   </requireJavaVersion>