You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/12/14 15:03:41 UTC
[1/2] flink git commit: [FLINK4429] Remove redis connector (now in
Apache Bahir)
Repository: flink
Updated Branches:
refs/heads/master 79d7e3017 -> 8038ae4c8
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
deleted file mode 100644
index dc59ba4..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
-import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisSentinelPool;
-import redis.embedded.RedisCluster;
-import redis.embedded.util.JedisUtil;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.apache.flink.util.NetUtils.getAvailablePort;
-
-public class RedisSentinelClusterTest extends TestLogger {
-
- private static RedisCluster cluster;
- private static final String REDIS_MASTER = "master";
- private static final String TEST_KEY = "testKey";
- private static final String TEST_VALUE = "testValue";
- private static final List<Integer> sentinels = Arrays.asList(getAvailablePort(), getAvailablePort());
- private static final List<Integer> group1 = Arrays.asList(getAvailablePort(), getAvailablePort());
-
- private JedisSentinelPool jedisSentinelPool;
- private FlinkJedisSentinelConfig jedisSentinelConfig;
-
- @BeforeClass
- public static void setUpCluster(){
- cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
- .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
- .build();
- cluster.start();
- }
-
- @Before
- public void setUp() {
- Set<String> hosts = JedisUtil.sentinelHosts(cluster);
- jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
- .setSentinels(hosts).build();
- jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
- jedisSentinelConfig.getSentinels());
- }
-
- @Test
- public void testRedisSentinelOperation() {
- RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig);
- Jedis jedis = null;
- try{
- jedis = jedisSentinelPool.getResource();
- redisContainer.set(TEST_KEY, TEST_VALUE);
- assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
- }finally {
- if (jedis != null){
- jedis.close();
- }
- }
- }
-
- @After
- public void tearDown() throws IOException {
- if (jedisSentinelPool != null) {
- jedisSentinelPool.close();
- }
- }
-
- @AfterClass
- public static void tearDownCluster() throws IOException {
- if (!cluster.isActive()) {
- cluster.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
deleted file mode 100644
index 21f3cca..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import redis.clients.jedis.Jedis;
-
-import static org.junit.Assert.assertEquals;
-
-public class RedisSinkITCase extends RedisITCaseBase {
-
- private FlinkJedisPoolConfig jedisPoolConfig;
- private static final Long NUM_ELEMENTS = 20L;
- private static final String REDIS_KEY = "TEST_KEY";
- private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY";
-
- StreamExecutionEnvironment env;
-
-
- private Jedis jedis;
-
- @Before
- public void setUp(){
- jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
- .setHost(REDIS_HOST)
- .setPort(REDIS_PORT).build();
- jedis = new Jedis(REDIS_HOST, REDIS_PORT);
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- @Test
- public void testRedisListDataType() throws Exception {
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
- new RedisCommandMapper(RedisCommand.LPUSH));
-
- source.addSink(redisSink);
- env.execute("Test Redis List Data Type");
-
- assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
-
- jedis.del(REDIS_KEY);
- }
-
- @Test
- public void testRedisSetDataType() throws Exception {
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
- new RedisCommandMapper(RedisCommand.SADD));
-
- source.addSink(redisSink);
- env.execute("Test Redis Set Data Type");
-
- assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
-
- jedis.del(REDIS_KEY);
- }
-
- @Test
- public void testRedisHyperLogLogDataType() throws Exception {
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
- new RedisCommandMapper(RedisCommand.PFADD));
-
- source.addSink(redisSink);
- env.execute("Test Redis Hyper Log Log Data Type");
-
- assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY)));
-
- jedis.del(REDIS_KEY);
- }
-
- @Test
- public void testRedisSortedSetDataType() throws Exception {
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
- new RedisAdditionalDataMapper(RedisCommand.ZADD));
-
- source.addSink(redisSink);
- env.execute("Test Redis Sorted Set Data Type");
-
- assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));
-
- jedis.del(REDIS_ADDITIONAL_KEY);
- }
-
- @Test
- public void testRedisHashDataType() throws Exception {
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionHash());
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig,
- new RedisAdditionalDataMapper(RedisCommand.HSET));
-
- source.addSink(redisSink);
- env.execute("Test Redis Hash Data Type");
-
- assertEquals(NUM_ELEMENTS, jedis.hlen(REDIS_ADDITIONAL_KEY));
-
- jedis.del(REDIS_ADDITIONAL_KEY);
- }
-
- @After
- public void tearDown(){
- if(jedis != null){
- jedis.close();
- }
- }
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(new Tuple2<>(REDIS_KEY, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- private static class TestSourceFunctionHash implements SourceFunction<Tuple2<String, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(new Tuple2<>("" + i, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- private static class TestSourceFunctionSortedSet implements SourceFunction<Tuple2<String, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(new Tuple2<>( "message #" + i, "" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- public static class RedisCommandMapper implements RedisMapper<Tuple2<String, String>>{
-
- private RedisCommand redisCommand;
-
- public RedisCommandMapper(RedisCommand redisCommand){
- this.redisCommand = redisCommand;
- }
-
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(redisCommand);
- }
-
- @Override
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
-
- @Override
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
- }
-
- public static class RedisAdditionalDataMapper implements RedisMapper<Tuple2<String, String>>{
-
- private RedisCommand redisCommand;
-
- public RedisAdditionalDataMapper(RedisCommand redisCommand){
- this.redisCommand = redisCommand;
- }
-
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
- }
-
- @Override
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
-
- @Override
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
deleted file mode 100644
index caf3945..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkPublishITCase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
-import redis.clients.jedis.JedisPool;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.After;
-import org.junit.Test;
-import redis.clients.jedis.JedisPubSub;
-
-import static org.junit.Assert.assertEquals;
-
-public class RedisSinkPublishITCase extends RedisITCaseBase {
-
- private static final int NUM_ELEMENTS = 20;
- private static final String REDIS_CHANNEL = "CHANNEL";
-
- private static final List<String> sourceList = new ArrayList<>();
- private Thread sinkThread;
- private PubSub pubSub;
-
- @Before
- public void before() throws Exception {
- pubSub = new PubSub();
- sinkThread = new Thread(new Subscribe(pubSub));
- }
-
- @Test
- public void redisSinkTest() throws Exception {
- sinkThread.start();
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
- .setHost(REDIS_HOST)
- .setPort(REDIS_PORT).build();
- DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());
-
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());
-
- source.addSink(redisSink);
-
- env.execute("Redis Sink Test");
-
- assertEquals(NUM_ELEMENTS, sourceList.size());
- }
-
- @After
- public void after() throws Exception {
- pubSub.unsubscribe();
- sinkThread.join();
- sourceList.clear();
- }
-
- private class Subscribe implements Runnable {
- private PubSub localPubSub;
- private Subscribe(PubSub pubSub){
- this.localPubSub = pubSub;
- }
-
- @Override
- public void run() {
- JedisPool pool = new JedisPool(REDIS_HOST, REDIS_PORT);
- pool.getResource().subscribe(localPubSub, REDIS_CHANNEL);
- }
- }
-
- private static class TestSourceFunction implements SourceFunction<Tuple2<String, String>> {
- private static final long serialVersionUID = 1L;
-
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
- for (int i = 0; i < NUM_ELEMENTS && running; i++) {
- ctx.collect(new Tuple2<>(REDIS_CHANNEL, "message #" + i));
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-
- public static class PubSub extends JedisPubSub {
-
- @Override
- public void onMessage(String channel, String message) {
- sourceList.add(message);
- }
-
- }
-
- private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>>{
-
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.PUBLISH);
- }
-
- @Override
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
-
- @Override
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
deleted file mode 100644
index 59f59f2..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.fail;
-
-public class RedisSinkTest extends TestLogger {
-
- @Test(expected=NullPointerException.class)
- public void shouldThrowNullPointExceptionIfDataMapperIsNull(){
- new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), null);
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerExceptionIfCommandDescriptionIsNull(){
- new RedisSink<>(new FlinkJedisClusterConfig.Builder().build(), new TestMapper(null));
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerExceptionIfConfigurationIsNull(){
- new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
- }
-
- @Test
- public void testRedisDownBehavior() throws Exception {
-
- // create a wrong configuration so that open() fails.
-
- FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
- .setHost("127.0.0.1")
- .setPort(1234).build();
-
- testDownBehavior(wrongJedisPoolConfig);
- }
-
- @Test
- public void testRedisClusterDownBehavior() throws Exception {
-
- Set<InetSocketAddress> hosts = new HashSet<>();
- hosts.add(new InetSocketAddress("127.0.0.1", 1234));
-
- // create a wrong configuration so that open() fails.
-
- FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
- .setNodes(hosts)
- .setTimeout(100)
- .setMaxIdle(1)
- .setMaxTotal(1)
- .setMinIdle(1).build();
-
- testDownBehavior(wrongJedisClusterConfig);
- }
-
- @Test
- public void testRedisSentinelDownBehavior() throws Exception {
-
- Set<String> hosts = new HashSet<>();
- hosts.add("localhost:55095");
-
- // create a wrong configuration so that open() fails.
-
- FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
- .setMasterName("master")
- .setSentinels(hosts)
- .build();
-
- testDownBehavior(wrongJedisSentinelConfig);
- }
-
- private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
- new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
-
- try {
- redisSink.open(new Configuration());
- } catch (Exception e) {
-
- // search for nested JedisConnectionExceptions
- // because this is the expected behavior
-
- Throwable t = e;
- int depth = 0;
- while (!(t instanceof JedisConnectionException)) {
- t = t.getCause();
- if (t == null || depth++ == 20) {
- throw e;
- }
- }
- }
- }
-
- private class TestMapper implements RedisMapper<Tuple2<String, String>>{
- private RedisCommandDescription redisCommandDescription;
-
- public TestMapper(RedisCommandDescription redisCommandDescription){
- this.redisCommandDescription = redisCommandDescription;
- }
- @Override
- public RedisCommandDescription getCommandDescription() {
- return redisCommandDescription;
- }
-
- @Override
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
-
- @Override
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
deleted file mode 100644
index ed1d713..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-public class FlinkJedisConfigBaseTest extends TestLogger {
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
- new TestConfig(-1, 0, 0, 0);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
- new TestConfig(1, -1, 0, 0);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
- new TestConfig(0, 0, -1, 0);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
- new TestConfig(0, 0, 0, -1);
- }
-
- private class TestConfig extends FlinkJedisConfigBase{
-
- protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
deleted file mode 100644
index 40db578..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisClusterConfigTest extends TestLogger {
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
- FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
- builder.setMinIdle(0)
- .setMaxIdle(0)
- .setMaxTotal(0)
- .setTimeout(0)
- .build();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIllegalArgumentExceptionIfNodeValuesAreEmpty(){
- Set<InetSocketAddress> set = new HashSet<>();
- FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder();
- builder.setMinIdle(0)
- .setMaxIdle(0)
- .setMaxTotal(0)
- .setTimeout(0)
- .setNodes(set)
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
deleted file mode 100644
index dc16cfe..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-public class JedisPoolConfigTest extends TestLogger {
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointExceptionIfHostValueIsNull(){
- FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
- builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
deleted file mode 100644
index 8445fae..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class JedisSentinelConfigTest extends TestLogger {
-
- public static final String MASTER_NAME = "test-master";
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointExceptionIfMasterValueIsNull(){
- FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
- Set<String> sentinels = new HashSet<>();
- sentinels.add("127.0.0.1");
- builder.setSentinels(sentinels).build();
- }
-
- @Test(expected = NullPointerException.class)
- public void shouldThrowNullPointExceptionIfSentinelsValueIsNull(){
- FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
- builder.setMasterName(MASTER_NAME).build();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowNullPointExceptionIfSentinelsValueIsEmpty(){
- FlinkJedisSentinelConfig.Builder builder = new FlinkJedisSentinelConfig.Builder();
- Set<String> sentinels = new HashSet<>();
- builder.setMasterName(MASTER_NAME).setSentinels(sentinels).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
deleted file mode 100644
index b0eee48..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.streaming.connectors.redis.RedisSinkITCase;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class RedisDataTypeDescriptionTest extends TestLogger {
-
- @Test(expected=IllegalArgumentException.class)
- public void shouldThrowExceptionIfAdditionalKeyIsNotGivenForHashDataType(){
- RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.HSET);
- redisCommandMapper.getCommandDescription();
- }
-
- @Test
- public void shouldReturnNullForAdditionalDataType(){
- RedisSinkITCase.RedisCommandMapper redisCommandMapper = new RedisSinkITCase.RedisCommandMapper(RedisCommand.LPUSH);
- RedisCommandDescription redisDataTypeDescription = redisCommandMapper.getCommandDescription();
- assertEquals(RedisDataType.LIST, redisDataTypeDescription.getCommand().getRedisDataType());
- assertNull(redisDataTypeDescription.getAdditionalKey());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index dcb33eb..695c34b 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -52,7 +52,6 @@ under the License.
<module>flink-connector-twitter</module>
<module>flink-connector-nifi</module>
<module>flink-connector-cassandra</module>
- <module>flink-connector-redis</module>
<module>flink-connector-filesystem</module>
</modules>
[2/2] flink git commit: [FLINK4429] Remove redis connector (now in
Apache Bahir)
Posted by rm...@apache.org.
[FLINK4429] Remove redis connector (now in Apache Bahir)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8038ae4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8038ae4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8038ae4c
Branch: refs/heads/master
Commit: 8038ae4c843f802c195e757100b1bbc1d5de3ea8
Parents: 79d7e30
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Dec 14 14:56:04 2016 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Dec 14 16:02:38 2016 +0100
----------------------------------------------------------------------
docs/dev/connectors/index.md | 1 -
docs/dev/connectors/redis.md | 174 -------------
flink-connectors/flink-connector-redis/pom.xml | 79 ------
.../streaming/connectors/redis/RedisSink.java | 188 --------------
.../common/config/FlinkJedisClusterConfig.java | 187 -------------
.../common/config/FlinkJedisConfigBase.java | 90 -------
.../common/config/FlinkJedisPoolConfig.java | 224 ----------------
.../common/config/FlinkJedisSentinelConfig.java | 259 -------------------
.../common/container/RedisClusterContainer.java | 171 ------------
.../container/RedisCommandsContainer.java | 115 --------
.../RedisCommandsContainerBuilder.java | 116 ---------
.../redis/common/container/RedisContainer.java | 252 ------------------
.../redis/common/mapper/RedisCommand.java | 86 ------
.../common/mapper/RedisCommandDescription.java | 94 -------
.../redis/common/mapper/RedisDataType.java | 66 -----
.../redis/common/mapper/RedisMapper.java | 66 -----
.../connectors/redis/RedisITCaseBase.java | 45 ----
.../redis/RedisSentinelClusterTest.java | 100 -------
.../connectors/redis/RedisSinkITCase.java | 233 -----------------
.../redis/RedisSinkPublishITCase.java | 137 ----------
.../connectors/redis/RedisSinkTest.java | 144 -----------
.../common/config/FlinkJedisConfigBaseTest.java | 50 ----
.../common/config/JedisClusterConfigTest.java | 49 ----
.../common/config/JedisPoolConfigTest.java | 29 ---
.../common/config/JedisSentinelConfigTest.java | 49 ----
.../mapper/RedisDataTypeDescriptionTest.java | 41 ---
flink-connectors/pom.xml | 1 -
27 files changed, 3046 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/docs/dev/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index 5de5300..8764463 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -38,7 +38,6 @@ Currently these systems are supported:
* [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) (source)
* [Apache NiFi](https://nifi.apache.org) (sink/source)
* [Apache Cassandra](https://cassandra.apache.org/) (sink)
- * [Redis](http://redis.io/) (sink)
To run an application using one of these connectors, additional third party
components are usually required to be installed and launched, e.g. the servers
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/docs/dev/connectors/redis.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/redis.md b/docs/dev/connectors/redis.md
deleted file mode 100644
index 0e3287d..0000000
--- a/docs/dev/connectors/redis.md
+++ /dev/null
@@ -1,174 +0,0 @@
----
-title: "Redis Connector"
-nav-title: Redis
-nav-parent_id: connectors
-nav-pos: 8
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-This connector provides a Sink that can write to
-[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the
-following dependency to your project:
-{% highlight xml %}
-<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
- <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-Version Compatibility: This module is compatible with Redis 2.8.5.
-
-Note that the streaming connectors are currently not part of the binary distribution. You need to link them for cluster execution [explicitly]({{site.baseurl}}/dev/linking).
-
-#### Installing Redis
-Follow the instructions from the [Redis download page](http://redis.io/download).
-
-#### Redis Sink
-A class providing an interface for sending data to Redis.
-The sink can use three different methods for communicating with different type of Redis environments:
-1. Single Redis Server
-2. Redis Cluster
-3. Redis Sentinel
-
-This code shows how to create a sink that communicate to a single redis server:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
-
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
- }
-
- @Override
- public String getKeyFromData(Tuple2<String, String> data) {
- return data.f0;
- }
-
- @Override
- public String getValueFromData(Tuple2<String, String> data) {
- return data.f1;
- }
-}
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class RedisExampleMapper extends RedisMapper[(String, String)]{
- override def getCommandDescription: RedisCommandDescription = {
- new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
- }
-
- override def getKeyFromData(data: (String, String)): String = data._1
-
- override def getValueFromData(data: (String, String)): String = data._2
-}
-val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example code does the same, but for Redis Cluster:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
- .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new InetSocketAddress(5601)))).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This example shows when the Redis environment is with Sentinels:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
- .setMasterName("master").setSentinels(...).build();
-
-DataStream<String> stream = ...;
-stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
-stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
-{% endhighlight %}
-</div>
-</div>
-
-This section gives a description of all the available data types and what Redis command used for that.
-
-<table class="table table-bordered" style="width: 75%">
- <thead>
- <tr>
- <th class="text-center" style="width: 20%">Data Type</th>
- <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
- <th class="text-center" style="width: 25%">Redis Command [Source]</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>HASH</td><td><a href="http://redis.io/commands/hset">HSET</a></td><td>--NA--</td>
- </tr>
- <tr>
- <td>LIST</td><td>
- <a href="http://redis.io/commands/rpush">RPUSH</a>,
- <a href="http://redis.io/commands/lpush">LPUSH</a>
- </td><td>--NA--</td>
- </tr>
- <tr>
- <td>SET</td><td><a href="http://redis.io/commands/rpush">SADD</a></td><td>--NA--</td>
- </tr>
- <tr>
- <td>PUBSUB</td><td><a href="http://redis.io/commands/publish">PUBLISH</a></td><td>--NA--</td>
- </tr>
- <tr>
- <td>STRING</td><td><a href="http://redis.io/commands/set">SET</a></td><td>--NA--</td>
- </tr>
- <tr>
- <td>HYPER_LOG_LOG</td><td><a href="http://redis.io/commands/pfadd">PFADD</a></td><td>--NA--</td>
- </tr>
- <tr>
- <td>SORTED_SET</td><td><a href="http://redis.io/commands/zadd">ZADD</a></td><td>--NA--</td>
- </tr>
- </tbody>
-</table>
-More about Redis can be found [here](http://redis.io/).
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/pom.xml b/flink-connectors/flink-connector-redis/pom.xml
deleted file mode 100644
index a348f31..0000000
--- a/flink-connectors/flink-connector-redis/pom.xml
+++ /dev/null
@@ -1,79 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connectors</artifactId>
- <version>1.2-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-redis_2.10</artifactId>
- <name>flink-connector-redis</name>
-
- <packaging>jar</packaging>
-
- <properties>
- <jedis.version>2.8.0</jedis.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>${jedis.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.github.kstyrc</groupId>
- <artifactId>embedded-redis</artifactId>
- <version>0.6</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.10</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
deleted file mode 100644
index f6b0fd7..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
-import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * A sink that delivers data to a Redis channel using the Jedis client.
- * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}.
- * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
- * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when
- * you want to connect to a single Redis server.
- * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection
- * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel.
- * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to
- * a Redis Cluster.
- *
- * <p>Example:
- *
- * <pre>
- *{@code
- *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
- *
- * private RedisCommand redisCommand;
- *
- * public RedisExampleMapper(RedisCommand redisCommand){
- * this.redisCommand = redisCommand;
- * }
- * public RedisCommandDescription getCommandDescription() {
- * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
- * }
- * public String getKeyFromData(Tuple2<String, String> data) {
- * return data.f0;
- * }
- * public String getValueFromData(Tuple2<String, String> data) {
- * return data.f1;
- * }
- *}
- *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
- * .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
- *new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
- *}</pre>
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class RedisSink<IN> extends RichSinkFunction<IN> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
-
- /**
- * This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
- * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
- * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
- * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
- * {@code additionalKey} used as hash name for {@link RedisDataType#HASH}
- * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
- * {@code additionalKey} used as set name for {@link RedisDataType#SORTED_SET}
- */
- private String additionalKey;
- private RedisMapper<IN> redisSinkMapper;
- private RedisCommand redisCommand;
-
- private FlinkJedisConfigBase flinkJedisConfigBase;
- private RedisCommandsContainer redisCommandsContainer;
-
- /**
- * Creates a new {@link RedisSink} that connects to the Redis server.
- *
- * @param flinkJedisConfigBase The configuration of {@link FlinkJedisConfigBase}
- * @param redisSinkMapper This is used to generate Redis command and key value from incoming elements.
- */
- public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
- Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
- Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
- Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
-
- this.flinkJedisConfigBase = flinkJedisConfigBase;
-
- this.redisSinkMapper = redisSinkMapper;
- RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
- this.redisCommand = redisCommandDescription.getCommand();
- this.additionalKey = redisCommandDescription.getAdditionalKey();
- }
-
- /**
- * Called when new data arrives to the sink, and forwards it to Redis channel.
- * Depending on the specified Redis data type (see {@link RedisDataType}),
- * a different Redis command will be applied.
- * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.
- *
- * @param input The incoming data
- */
- @Override
- public void invoke(IN input) throws Exception {
- String key = redisSinkMapper.getKeyFromData(input);
- String value = redisSinkMapper.getValueFromData(input);
-
- switch (redisCommand) {
- case RPUSH:
- this.redisCommandsContainer.rpush(key, value);
- break;
- case LPUSH:
- this.redisCommandsContainer.lpush(key, value);
- break;
- case SADD:
- this.redisCommandsContainer.sadd(key, value);
- break;
- case SET:
- this.redisCommandsContainer.set(key, value);
- break;
- case PFADD:
- this.redisCommandsContainer.pfadd(key, value);
- break;
- case PUBLISH:
- this.redisCommandsContainer.publish(key, value);
- break;
- case ZADD:
- this.redisCommandsContainer.zadd(this.additionalKey, value, key);
- break;
- case HSET:
- this.redisCommandsContainer.hset(this.additionalKey, key, value);
- break;
- default:
- throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
- }
- }
-
- /**
- * Initializes the connection to Redis by either cluster or sentinels or single server.
- *
- * @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- try {
- this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
- this.redisCommandsContainer.open();
- } catch (Exception e) {
- LOG.error("Redis has not been properly initialized: ", e);
- throw e;
- }
- }
-
- /**
- * Closes commands container.
- * @throws IOException if command container is unable to close.
- */
- @Override
- public void close() throws IOException {
- if (redisCommandsContainer != null) {
- redisCommandsContainer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
deleted file mode 100644
index 6e6cfe5..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.HostAndPort;
-import redis.clients.jedis.Protocol;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Configuration for Jedis cluster.
- */
-public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
- private static final long serialVersionUID = 1L;
-
- private final Set<InetSocketAddress> nodes;
- private final int maxRedirections;
-
-
- /**
- * Jedis cluster configuration.
- * The list of node is mandatory, and when nodes is not set, it throws NullPointerException.
- *
- * @param nodes list of node information for JedisCluster
- * @param connectionTimeout socket / connection timeout. The default is 2000
- * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
- * @param maxTotal the maximum number of objects that can be allocated by the pool
- * @param maxIdle the cap on the number of "idle" instances in the pool
- * @param minIdle the minimum number of idle objects to maintain in the pool
- * @throws NullPointerException if parameter {@code nodes} is {@code null}
- */
- private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
-
- Preconditions.checkNotNull(nodes, "Node information should be presented");
- Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
- this.nodes = new HashSet<>(nodes);
- this.maxRedirections = maxRedirections;
- }
-
-
-
- /**
- * Returns nodes.
- *
- * @return list of node information
- */
- public Set<HostAndPort> getNodes() {
- Set<HostAndPort> ret = new HashSet<>();
- for (InetSocketAddress node : nodes) {
- ret.add(new HostAndPort(node.getHostName(), node.getPort()));
- }
- return ret;
- }
-
- /**
- * Returns limit of redirection.
- *
- * @return limit of redirection
- */
- public int getMaxRedirections() {
- return maxRedirections;
- }
-
-
- /**
- * Builder for initializing {@link FlinkJedisClusterConfig}.
- */
- public static class Builder {
- private Set<InetSocketAddress> nodes;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int maxRedirections = 5;
- private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
- private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
- private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
- /**
- * Sets list of node.
- *
- * @param nodes list of node
- * @return Builder itself
- */
- public Builder setNodes(Set<InetSocketAddress> nodes) {
- this.nodes = nodes;
- return this;
- }
-
- /**
- * Sets socket / connection timeout.
- *
- * @param timeout socket / connection timeout, default value is 2000
- * @return Builder itself
- */
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- /**
- * Sets limit of redirection.
- *
- * @param maxRedirections limit of redirection, default value is 5
- * @return Builder itself
- */
- public Builder setMaxRedirections(int maxRedirections) {
- this.maxRedirections = maxRedirections;
- return this;
- }
-
- /**
- * Sets value for the {@code maxTotal} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxTotal(int maxTotal) {
- this.maxTotal = maxTotal;
- return this;
- }
-
- /**
- * Sets value for the {@code maxIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxIdle(int maxIdle) {
- this.maxIdle = maxIdle;
- return this;
- }
-
- /**
- * Sets value for the {@code minIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
- * @return Builder itself
- */
- public Builder setMinIdle(int minIdle) {
- this.minIdle = minIdle;
- return this;
- }
-
- /**
- * Builds JedisClusterConfig.
- *
- * @return JedisClusterConfig
- */
- public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle);
- }
- }
-
- @Override
- public String toString() {
- return "JedisClusterConfig{" +
- "nodes=" + nodes +
- ", timeout=" + connectionTimeout +
- ", maxRedirections=" + maxRedirections +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
deleted file mode 100644
index a2489b8..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * Base class for Flink Redis configuration.
- */
-public abstract class FlinkJedisConfigBase implements Serializable {
- private static final long serialVersionUID = 1L;
-
- protected final int maxTotal;
- protected final int maxIdle;
- protected final int minIdle;
- protected final int connectionTimeout;
-
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
- Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
- Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
- Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
- Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative");
- this.connectionTimeout = connectionTimeout;
- this.maxTotal = maxTotal;
- this.maxIdle = maxIdle;
- this.minIdle = minIdle;
- }
-
- /**
- * Returns timeout.
- *
- * @return connection timeout
- */
- public int getConnectionTimeout() {
- return connectionTimeout;
- }
-
- /**
- * Get the value for the {@code maxTotal} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @return The current setting of {@code maxTotal} for this
- * configuration instance
- * @see GenericObjectPoolConfig#getMaxTotal()
- */
- public int getMaxTotal() {
- return maxTotal;
- }
-
- /**
- * Get the value for the {@code maxIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @return The current setting of {@code maxIdle} for this
- * configuration instance
- * @see GenericObjectPoolConfig#getMaxIdle()
- */
- public int getMaxIdle() {
- return maxIdle;
- }
-
- /**
- * Get the value for the {@code minIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @return The current setting of {@code minIdle} for this
- * configuration instance
- * @see GenericObjectPoolConfig#getMinIdle()
- */
- public int getMinIdle() {
- return minIdle;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
deleted file mode 100644
index d261a35..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.Protocol;
-
-/**
- * Configuration for Jedis pool.
- */
-public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
-
- private static final long serialVersionUID = 1L;
-
- private final String host;
- private final int port;
- private final int database;
- private final String password;
-
-
- /**
- * Jedis pool configuration.
- * The host is mandatory, and when host is not set, it throws NullPointerException.
- *
- * @param host hostname or IP
- * @param port port, default value is 6379
- * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
- * @param password password, if any
- * @param database database index
- * @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
- * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
- * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
- * @throws NullPointerException if parameter {@code host} is {@code null}
- */
- private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
- Preconditions.checkNotNull(host, "Host information should be presented");
- this.host = host;
- this.port = port;
- this.database = database;
- this.password = password;
- }
-
- /**
- * Returns host.
- *
- * @return hostname or IP
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Returns port.
- *
- * @return port
- */
- public int getPort() {
- return port;
- }
-
-
- /**
- * Returns database index.
- *
- * @return database index
- */
- public int getDatabase() {
- return database;
- }
-
- /**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
- * Builder for initializing {@link FlinkJedisPoolConfig}.
- */
- public static class Builder {
- private String host;
- private int port = Protocol.DEFAULT_PORT;
- private int timeout = Protocol.DEFAULT_TIMEOUT;
- private int database = Protocol.DEFAULT_DATABASE;
- private String password;
- private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
- private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
- private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
- /**
- * Sets value for the {@code maxTotal} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxTotal(int maxTotal) {
- this.maxTotal = maxTotal;
- return this;
- }
-
- /**
- * Sets value for the {@code maxIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxIdle(int maxIdle) {
- this.maxIdle = maxIdle;
- return this;
- }
-
- /**
- * Sets value for the {@code minIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
- * @return Builder itself
- */
- public Builder setMinIdle(int minIdle) {
- this.minIdle = minIdle;
- return this;
- }
-
- /**
- * Sets host.
- *
- * @param host host
- * @return Builder itself
- */
- public Builder setHost(String host) {
- this.host = host;
- return this;
- }
-
- /**
- * Sets port.
- *
- * @param port port, default value is 6379
- * @return Builder itself
- */
- public Builder setPort(int port) {
- this.port = port;
- return this;
- }
-
- /**
- * Sets timeout.
- *
- * @param timeout timeout, default value is 2000
- * @return Builder itself
- */
- public Builder setTimeout(int timeout) {
- this.timeout = timeout;
- return this;
- }
-
- /**
- * Sets database index.
- *
- * @param database database index, default value is 0
- * @return Builder itself
- */
- public Builder setDatabase(int database) {
- this.database = database;
- return this;
- }
-
- /**
- * Sets password.
- *
- * @param password password, if any
- * @return Builder itself
- */
- public Builder setPassword(String password) {
- this.password = password;
- return this;
- }
-
-
- /**
- * Builds JedisPoolConfig.
- *
- * @return JedisPoolConfig
- */
- public FlinkJedisPoolConfig build() {
- return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
- }
- }
-
- @Override
- public String toString() {
- return "JedisPoolConfig{" +
- "host='" + host + '\'' +
- ", port=" + port +
- ", timeout=" + connectionTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
deleted file mode 100644
index 2cdb397..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.config;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Protocol;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Configuration for Jedis Sentinel pool.
- */
-public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
-
- private final String masterName;
- private final Set<String> sentinels;
- private final int soTimeout;
- private final String password;
- private final int database;
-
- /**
- * Jedis Sentinels config.
- * The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException.
- *
- * @param masterName master name of the replica set
- * @param sentinels set of sentinel hosts
- * @param connectionTimeout timeout connection timeout
- * @param soTimeout timeout socket timeout
- * @param password password, if any
- * @param database database database index
- * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
- * @param maxIdle the cap on the number of "idle" instances in the pool
- * @param minIdle the minimum number of idle objects to maintain in the pool
- *
- * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
- * @throws IllegalArgumentException if {@code sentinels} are empty
- */
- private FlinkJedisSentinelConfig(String masterName, Set<String> sentinels,
- int connectionTimeout, int soTimeout,
- String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle);
- Preconditions.checkNotNull(masterName, "Master name should be presented");
- Preconditions.checkNotNull(sentinels, "Sentinels information should be presented");
- Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
-
- this.masterName = masterName;
- this.sentinels = new HashSet<>(sentinels);
- this.soTimeout = soTimeout;
- this.password = password;
- this.database = database;
- }
-
- /**
- * Returns master name of the replica set.
- *
- * @return master name of the replica set.
- */
- public String getMasterName() {
- return masterName;
- }
-
- /**
- * Returns Sentinels host addresses.
- *
- * @return Set of Sentinels host addresses
- */
- public Set<String> getSentinels() {
- return sentinels;
- }
-
- /**
- * Returns socket timeout.
- *
- * @return socket timeout
- */
- public int getSoTimeout() {
- return soTimeout;
- }
-
- /**
- * Returns password.
- *
- * @return password
- */
- public String getPassword() {
- return password;
- }
-
- /**
- * Returns database index.
- *
- * @return database index
- */
- public int getDatabase() {
- return database;
- }
-
- /**
- * Builder for initializing {@link FlinkJedisSentinelConfig}.
- */
- public static class Builder {
- private String masterName;
- private Set<String> sentinels;
- private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
- private int soTimeout = Protocol.DEFAULT_TIMEOUT;
- private String password;
- private int database = Protocol.DEFAULT_DATABASE;
- private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
- private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
- private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
-
- /**
- * Sets master name of the replica set.
- *
- * @param masterName master name of the replica set
- * @return Builder itself
- */
- public Builder setMasterName(String masterName) {
- this.masterName = masterName;
- return this;
- }
-
- /**
- * Sets sentinels address.
- *
- * @param sentinels host set of the sentinels
- * @return Builder itself
- */
- public Builder setSentinels(Set<String> sentinels) {
- this.sentinels = sentinels;
- return this;
- }
-
- /**
- * Sets connection timeout.
- *
- * @param connectionTimeout connection timeout, default value is 2000
- * @return Builder itself
- */
- public Builder setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- return this;
- }
-
- /**
- * Sets socket timeout.
- *
- * @param soTimeout socket timeout, default value is 2000
- * @return Builder itself
- */
- public Builder setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- return this;
- }
-
- /**
- * Sets password.
- *
- * @param password password, if any
- * @return Builder itself
- */
- public Builder setPassword(String password) {
- this.password = password;
- return this;
- }
-
- /**
- * Sets database index.
- *
- * @param database database index, default value is 0
- * @return Builder itself
- */
- public Builder setDatabase(int database) {
- this.database = database;
- return this;
- }
-
- /**
- * Sets value for the {@code maxTotal} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxTotal(int maxTotal) {
- this.maxTotal = maxTotal;
- return this;
- }
-
- /**
- * Sets value for the {@code maxIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
- * @return Builder itself
- */
- public Builder setMaxIdle(int maxIdle) {
- this.maxIdle = maxIdle;
- return this;
- }
-
- /**
- * Sets value for the {@code minIdle} configuration attribute
- * for pools to be created with this configuration instance.
- *
- * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
- * @return Builder itself
- */
- public Builder setMinIdle(int minIdle) {
- this.minIdle = minIdle;
- return this;
- }
-
- /**
- * Builds JedisSentinelConfig.
- *
- * @return JedisSentinelConfig
- */
- public FlinkJedisSentinelConfig build(){
- return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
- password, database, maxTotal, maxIdle, minIdle);
- }
- }
-
- @Override
- public String toString() {
- return "JedisSentinelConfig{" +
- "masterName='" + masterName + '\'' +
- ", connectionTimeout=" + connectionTimeout +
- ", soTimeout=" + soTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
deleted file mode 100644
index d6621d6..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCluster;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Redis command container if we want to connect to a Redis cluster.
- */
-public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
-
- private transient JedisCluster jedisCluster;
-
- /**
- * Initialize Redis command container for Redis cluster.
- *
- * @param jedisCluster JedisCluster instance
- */
- public RedisClusterContainer(JedisCluster jedisCluster) {
- Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null");
-
- this.jedisCluster = jedisCluster;
- }
-
- @Override
- public void open() throws Exception {
-
- // echo() tries to open a connection and echos back the
- // message passed as argument. Here we use it to monitor
- // if we can communicate with the cluster.
-
- jedisCluster.echo("Test");
- }
-
- @Override
- public void hset(final String key, final String hashField, final String value) {
- try {
- jedisCluster.hset(key, hashField, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command HSET to hash {} error message {}",
- key, hashField, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void rpush(final String listName, final String value) {
- try {
- jedisCluster.rpush(listName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}",
- listName, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void lpush(String listName, String value) {
- try {
- jedisCluster.lpush(listName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command LPUSH to list {} error message: {}",
- listName, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void sadd(final String setName, final String value) {
- try {
- jedisCluster.sadd(setName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
- setName, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void publish(final String channelName, final String message) {
- try {
- jedisCluster.publish(channelName, message);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
- channelName, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void set(final String key, final String value) {
- try {
- jedisCluster.set(key, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command SET to key {} error message {}",
- key, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void pfadd(final String key, final String element) {
- try {
- jedisCluster.set(key, element);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
- key, e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public void zadd(final String key, final String score, final String element) {
- try {
- jedisCluster.zadd(key, Double.valueOf(score), element);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
- key, e.getMessage());
- }
- throw e;
- }
- }
-
- /**
- * Closes the {@link JedisCluster}.
- */
- @Override
- public void close() throws IOException {
- this.jedisCluster.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
deleted file mode 100644
index 55dbfc2..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * The container for all available Redis commands.
- */
-public interface RedisCommandsContainer extends Serializable {
-
- /**
- * Open the Jedis container.
- *
- * @throws Exception if the instance can not be opened properly
- */
- void open() throws Exception;
-
- /**
- * Sets field in the hash stored at key to value.
- * If key does not exist, a new key holding a hash is created.
- * If field already exists in the hash, it is overwritten.
- *
- * @param key Hash name
- * @param hashField Hash field
- * @param value Hash value
- */
- void hset(String key, String hashField, String value);
-
- /**
- * Insert the specified value at the tail of the list stored at key.
- * If key does not exist, it is created as empty list before performing the push operation.
- *
- * @param listName Name of the List
- * @param value Value to be added
- */
- void rpush(String listName, String value);
-
- /**
- * Insert the specified value at the head of the list stored at key.
- * If key does not exist, it is created as empty list before performing the push operation.
- *
- * @param listName Name of the List
- * @param value Value to be added
- */
- void lpush(String listName, String value);
-
- /**
- * Add the specified member to the set stored at key.
- * Specified members that are already a member of this set are ignored.
- * If key does not exist, a new set is created before adding the specified members.
- *
- * @param setName Name of the Set
- * @param value Value to be added
- */
- void sadd(String setName, String value);
-
- /**
- * Posts a message to the given channel.
- *
- * @param channelName Name of the channel to which data will be published
- * @param message the message
- */
- void publish(String channelName, String message);
-
- /**
- * Set key to hold the string value. If key already holds a value, it is overwritten,
- * regardless of its type. Any previous time to live associated with the key is
- * discarded on successful SET operation.
- *
- * @param key the key name in which value to be set
- * @param value the value
- */
- void set(String key, String value);
-
- /**
- * Adds all the element arguments to the HyperLogLog data structure
- * stored at the variable name specified as first argument.
- *
- * @param key The name of the key
- * @param element the element
- */
- void pfadd(String key, String element);
-
- /**
- * Adds the specified member with the specified scores to the sorted set stored at key.
- *
- * @param key The name of the Sorted Set
- * @param score Score of the element
- * @param element element to be added
- */
- void zadd(String key, String score, String element);
-
- /**
- * Close the Jedis container.
- *
- * @throws IOException if the instance can not be closed properly
- */
- void close() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
deleted file mode 100644
index dc5396a..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
-import org.apache.flink.util.Preconditions;
-import redis.clients.jedis.JedisCluster;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisSentinelPool;
-
-/**
- * The builder for {@link RedisCommandsContainer}.
- */
-public class RedisCommandsContainerBuilder {
-
- /**
- * Initialize the {@link RedisCommandsContainer} based on the instance type.
- * @param flinkJedisConfigBase configuration base
- * @return @throws IllegalArgumentException if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all null
- */
- public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
- if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
- FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
- return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
- } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
- FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
- return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
- } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
- FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
- return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
- } else {
- throw new IllegalArgumentException("Jedis configuration not found");
- }
- }
-
- /**
- * Builds container for single Redis environment.
- *
- * @param jedisPoolConfig configuration for JedisPool
- * @return container for single Redis environment
- * @throws NullPointerException if jedisPoolConfig is null
- */
- public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
- Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
-
- GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
- genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
- genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
- genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
-
- JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
- jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
- jedisPoolConfig.getDatabase());
- return new RedisContainer(jedisPool);
- }
-
- /**
- * Builds container for Redis Cluster environment.
- *
- * @param jedisClusterConfig configuration for JedisCluster
- * @return container for Redis Cluster environment
- * @throws NullPointerException if jedisClusterConfig is null
- */
- public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
- Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
-
- GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
- genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
- genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
- genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
-
- JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
- return new RedisClusterContainer(jedisCluster);
- }
-
- /**
- * Builds container for Redis Sentinel environment.
- *
- * @param jedisSentinelConfig configuration for JedisSentinel
- * @return container for Redis sentinel environment
- * @throws NullPointerException if jedisSentinelConfig is null
- */
- public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
- Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
-
- GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
- genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
- genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
- genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
-
- JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
- jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
- jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
- jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
- return new RedisContainer(jedisSentinelPool);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
deleted file mode 100644
index ba4bbda..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.container;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisSentinelPool;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Redis command container if we want to connect to a single Redis server or to Redis sentinels
- * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
- * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
- */
-public class RedisContainer implements RedisCommandsContainer, Closeable {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
-
- private transient JedisPool jedisPool;
- private transient JedisSentinelPool jedisSentinelPool;
-
- /**
- * Use this constructor if to connect with single Redis server.
- *
- * @param jedisPool JedisPool which actually manages Jedis instances
- */
- public RedisContainer(JedisPool jedisPool) {
- Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null");
- this.jedisPool = jedisPool;
- this.jedisSentinelPool = null;
- }
-
- /**
- * Use this constructor if Redis environment is clustered with sentinels.
- *
- * @param sentinelPool SentinelPool which actually manages Jedis instances
- */
- public RedisContainer(final JedisSentinelPool sentinelPool) {
- Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null");
- this.jedisPool = null;
- this.jedisSentinelPool = sentinelPool;
- }
-
- /**
- * Closes the Jedis instances.
- */
- @Override
- public void close() throws IOException {
- if (this.jedisPool != null) {
- this.jedisPool.close();
- }
- if (this.jedisSentinelPool != null) {
- this.jedisSentinelPool.close();
- }
- }
-
- @Override
- public void open() throws Exception {
-
- // echo() tries to open a connection and echos back the
- // message passed as argument. Here we use it to monitor
- // if we can communicate with the cluster.
-
- getInstance().echo("Test");
- }
-
- @Override
- public void hset(final String key, final String hashField, final String value) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.hset(key, hashField, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}",
- key, hashField, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void rpush(final String listName, final String value) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.rpush(listName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}",
- listName, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void lpush(String listName, String value) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.lpush(listName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command LUSH to list {} error message {}",
- listName, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void sadd(final String setName, final String value) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.sadd(setName, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}",
- setName, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void publish(final String channelName, final String message) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.publish(channelName, message);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command PUBLISH to channel {} error message {}",
- channelName, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void set(final String key, final String value) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.set(key, value);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command SET to key {} error message {}",
- key, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void pfadd(final String key, final String element) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.pfadd(key, element);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command PFADD to key {} error message {}",
- key, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- @Override
- public void zadd(final String key, final String score, final String element) {
- Jedis jedis = null;
- try {
- jedis = getInstance();
- jedis.zadd(key, Double.valueOf(score), element);
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot send Redis message with command ZADD to set {} error message {}",
- key, e.getMessage());
- }
- throw e;
- } finally {
- releaseInstance(jedis);
- }
- }
-
- /**
- * Returns Jedis instance from the pool.
- *
- * @return the Jedis instance
- */
- private Jedis getInstance() {
- if (jedisSentinelPool != null) {
- return jedisSentinelPool.getResource();
- } else {
- return jedisPool.getResource();
- }
- }
-
- /**
- * Closes the jedis instance after finishing the command.
- *
- * @param jedis The jedis instance
- */
- private void releaseInstance(final Jedis jedis) {
- if (jedis == null) {
- return;
- }
- try {
- jedis.close();
- } catch (Exception e) {
- LOG.error("Failed to close (return) instance to pool", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
deleted file mode 100644
index b0661c7..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-/**
- * All available commands for Redis. Each command belongs to a {@link RedisDataType} group.
- */
-public enum RedisCommand {
-
- /**
- * Insert the specified value at the head of the list stored at key.
- * If key does not exist, it is created as empty list before performing the push operations.
- */
- LPUSH(RedisDataType.LIST),
-
- /**
- * Insert the specified value at the tail of the list stored at key.
- * If key does not exist, it is created as empty list before performing the push operation.
- */
- RPUSH(RedisDataType.LIST),
-
- /**
- * Add the specified member to the set stored at key.
- * Specified member that is already a member of this set is ignored.
- */
- SADD(RedisDataType.SET),
-
- /**
- * Set key to hold the string value. If key already holds a value,
- * it is overwritten, regardless of its type.
- */
- SET(RedisDataType.STRING),
-
- /**
- * Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
- */
- PFADD(RedisDataType.HYPER_LOG_LOG),
-
- /**
- * Posts a message to the given channel.
- */
- PUBLISH(RedisDataType.PUBSUB),
-
- /**
- * Adds the specified members with the specified score to the sorted set stored at key.
- */
- ZADD(RedisDataType.SORTED_SET),
-
- /**
- * Sets field in the hash stored at key to value. If key does not exist,
- * a new key holding a hash is created. If field already exists in the hash, it is overwritten.
- */
- HSET(RedisDataType.HASH);
-
- /**
- * The {@link RedisDataType} this command belongs to.
- */
- private RedisDataType redisDataType;
-
- RedisCommand(RedisDataType redisDataType) {
- this.redisDataType = redisDataType;
- }
-
-
- /**
- * The {@link RedisDataType} this command belongs to.
- * @return the {@link RedisDataType}
- */
- public RedisDataType getRedisDataType(){
- return redisDataType;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
deleted file mode 100644
index 1eea48a..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-
-/**
- * The description of the command type. This must be passed while creating new {@link RedisMapper}.
- * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET},
- * you need to use first constructor {@link #RedisCommandDescription(RedisCommand, String)}.
- * If the {@code additionalKey} is {@code null} it will throw {@code IllegalArgumentException}
- *
- * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}
- * you can use second constructor {@link #RedisCommandDescription(RedisCommand)}
- */
-public class RedisCommandDescription implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private RedisCommand redisCommand;
-
- /**
- * This additional key is needed for the group {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
- * Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added.
- * But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables.
- * <p>For {@link RedisDataType#HASH} we need hash name, hash key and element.
- * {@link #getAdditionalKey()} used as hash name for {@link RedisDataType#HASH}
- * <p>For {@link RedisDataType#SORTED_SET} we need set name, the element and it's score.
- * {@link #getAdditionalKey()} used as set name for {@link RedisDataType#SORTED_SET}
- */
- private String additionalKey;
-
- /**
- * Use this constructor when data type is {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
- * If different data type is specified, {@code additionalKey} is ignored.
- * @param redisCommand the redis command type {@link RedisCommand}
- * @param additionalKey additional key for Hash and Sorted set data type
- */
- public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
- Preconditions.checkNotNull(redisCommand, "Redis command type can not be null");
- this.redisCommand = redisCommand;
- this.additionalKey = additionalKey;
-
- if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
- redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) {
- if (additionalKey == null) {
- throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
- }
- }
- }
-
- /**
- * Use this constructor when command type is not in group {@link RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
- *
- * @param redisCommand the redis data type {@link RedisCommand}
- */
- public RedisCommandDescription(RedisCommand redisCommand) {
- this(redisCommand, null);
- }
-
- /**
- * Returns the {@link RedisCommand}.
- *
- * @return the command type of the mapping
- */
- public RedisCommand getCommand() {
- return redisCommand;
- }
-
- /**
- * Returns the additional key if data type is {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
- *
- * @return the additional key
- */
- public String getAdditionalKey() {
- return additionalKey;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
deleted file mode 100644
index 6e3997c..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-/**
- * All available data type for Redis.
- */
-public enum RedisDataType {
-
- /**
- * Strings are the most basic kind of Redis value. Redis Strings are binary safe,
- * this means that a Redis string can contain any kind of data, for instance a JPEG image or a serialized Ruby object.
- * A String value can be at max 512 Megabytes in length.
- */
- STRING,
-
- /**
- * Redis Hashes are maps between string fields and string values.
- */
- HASH,
-
- /**
- * Redis Lists are simply lists of strings, sorted by insertion order.
- */
- LIST,
-
- /**
- * Redis Sets are an unordered collection of Strings.
- */
- SET,
-
- /**
- * Redis Sorted Sets are, similarly to Redis Sets, non repeating collections of Strings.
- * The difference is that every member of a Sorted Set is associated with score,
- * that is used in order to take the sorted set ordered, from the smallest to the greatest score.
- * While members are unique, scores may be repeated.
- */
- SORTED_SET,
-
- /**
- * HyperLogLog is a probabilistic data structure used in order to count unique things.
- */
- HYPER_LOG_LOG,
-
- /**
- * Redis implementation of publish and subscribe paradigm. Published messages are characterized into channels,
- * without knowledge of what (if any) subscribers there may be.
- * Subscribers express interest in one or more channels, and only receive messages
- * that are of interest, without knowledge of what (if any) publishers there are.
- */
- PUBSUB
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java b/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
deleted file mode 100644
index 63fed19..0000000
--- a/flink-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis.common.mapper;
-
-import org.apache.flink.api.common.functions.Function;
-
-import java.io.Serializable;
-
-/**
- * Function that creates the description how the input data should be mapped to redis type.
- *<p>Example:
- *<pre>{@code
- *private static class RedisTestMapper implements RedisMapper<Tuple2<String, String>> {
- * public RedisDataTypeDescription getCommandDescription() {
- * return new RedisDataTypeDescription(RedisCommand.PUBLISH);
- * }
- * public String getKeyFromData(Tuple2<String, String> data) {
- * return data.f0;
- * }
- * public String getValueFromData(Tuple2<String, String> data) {
- * return data.f1;
- * }
- *}
- *}</pre>
- *
- * @param <T> The type of the element handled by this {@code RedisMapper}
- */
-public interface RedisMapper<T> extends Function, Serializable {
-
- /**
- * Returns descriptor which defines data type.
- *
- * @return data type descriptor
- */
- RedisCommandDescription getCommandDescription();
-
- /**
- * Extracts key from data.
- *
- * @param data source data
- * @return key
- */
- String getKeyFromData(T data);
-
- /**
- * Extracts value from data.
- *
- * @param data source data
- * @return value
- */
- String getValueFromData(T data);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8038ae4c/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
deleted file mode 100644
index 7d98f2d..0000000
--- a/flink-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.redis;
-
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import redis.embedded.RedisServer;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.NetUtils.getAvailablePort;
-
-public abstract class RedisITCaseBase extends StreamingMultipleProgramsTestBase {
-
- public static final int REDIS_PORT = getAvailablePort();
- public static final String REDIS_HOST = "127.0.0.1";
-
- private static RedisServer redisServer;
-
- @BeforeClass
- public static void createRedisServer() throws IOException, InterruptedException {
- redisServer = new RedisServer(REDIS_PORT);
- redisServer.start();
- }
-
- @AfterClass
- public static void stopRedisServer(){
- redisServer.stop();
- }
-}