You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/08/18 14:42:34 UTC
flink git commit: [hotfix] Fix Redis Sink to fail at opening if Redis
is not initialized.
Repository: flink
Updated Branches:
refs/heads/master 47acdeadf -> e9a067229
[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.
This closes #2245
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a06722
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a06722
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a06722
Branch: refs/heads/master
Commit: e9a067229d787c3e874dc40303ac8661045cc54f
Parents: 47acdea
Author: kl0u <kk...@gmail.com>
Authored: Wed Jul 13 18:40:17 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 18 16:41:53 2016 +0200
----------------------------------------------------------------------
.../streaming/connectors/redis/RedisSink.java | 8 +-
.../common/container/RedisClusterContainer.java | 12 ++-
.../container/RedisCommandsContainer.java | 7 ++
.../redis/common/container/RedisContainer.java | 17 ++++-
.../connectors/redis/RedisSinkITCase.java | 2 -
.../connectors/redis/RedisSinkTest.java | 79 ++++++++++++++++++++
6 files changed, 117 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 43518e8..f6b0fd7 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -166,7 +166,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
*/
@Override
public void open(Configuration parameters) throws Exception {
- this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+ try {
+ this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+ this.redisCommandsContainer.open();
+ } catch (Exception e) {
+ LOG.error("Redis has not been properly initialized: ", e);
+ throw e;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 7551c9e..d6621d6 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -33,7 +33,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
- private JedisCluster jedisCluster;
+ private transient JedisCluster jedisCluster;
/**
* Initialize Redis command container for Redis cluster.
@@ -47,6 +47,16 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
}
@Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ jedisCluster.echo("Test");
+ }
+
+ @Override
public void hset(final String key, final String hashField, final String value) {
try {
jedisCluster.hset(key, hashField, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 1b92c2e..55dbfc2 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -25,6 +25,13 @@ import java.io.Serializable;
public interface RedisCommandsContainer extends Serializable {
/**
+ * Open the Jedis container.
+ *
+ * @throws Exception if the instance can not be opened properly
+ */
+ void open() throws Exception;
+
+ /**
* Sets field in the hash stored at key to value.
* If key does not exist, a new key holding a hash is created.
* If field already exists in the hash, it is overwritten.
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 8684e9a..ba4bbda 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -29,7 +29,7 @@ import java.io.IOException;
/**
* Redis command container if we want to connect to a single Redis server or to Redis sentinels
* If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
- * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
*/
public class RedisContainer implements RedisCommandsContainer, Closeable {
@@ -37,9 +37,8 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
- private final JedisPool jedisPool;
- private final JedisSentinelPool jedisSentinelPool;
-
+ private transient JedisPool jedisPool;
+ private transient JedisSentinelPool jedisSentinelPool;
/**
* Use this constructor if to connect with single Redis server.
@@ -77,6 +76,16 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
}
@Override
+ public void open() throws Exception {
+
+ // echo() tries to open a connection and echos back the
+ // message passed as argument. Here we use it to monitor
+ // if we can communicate with the cluster.
+
+ getInstance().echo("Test");
+ }
+
+ @Override
public void hset(final String key, final String hashField, final String value) {
Jedis jedis = null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 237d9e5..21f3cca 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
@@ -31,7 +30,6 @@ import org.junit.Test;
import redis.clients.jedis.Jedis;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class RedisSinkITCase extends RedisITCaseBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 848af57..59f59f2 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -17,12 +17,23 @@
package org.apache.flink.streaming.connectors.redis;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
public class RedisSinkTest extends TestLogger {
@@ -41,6 +52,74 @@ public class RedisSinkTest extends TestLogger {
new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
}
+ @Test
+ public void testRedisDownBehavior() throws Exception {
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+ .setHost("127.0.0.1")
+ .setPort(1234).build();
+
+ testDownBehavior(wrongJedisPoolConfig);
+ }
+
+ @Test
+ public void testRedisClusterDownBehavior() throws Exception {
+
+ Set<InetSocketAddress> hosts = new HashSet<>();
+ hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+ .setNodes(hosts)
+ .setTimeout(100)
+ .setMaxIdle(1)
+ .setMaxTotal(1)
+ .setMinIdle(1).build();
+
+ testDownBehavior(wrongJedisClusterConfig);
+ }
+
+ @Test
+ public void testRedisSentinelDownBehavior() throws Exception {
+
+ Set<String> hosts = new HashSet<>();
+ hosts.add("localhost:55095");
+
+ // create a wrong configuration so that open() fails.
+
+ FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
+ .setMasterName("master")
+ .setSentinels(hosts)
+ .build();
+
+ testDownBehavior(wrongJedisSentinelConfig);
+ }
+
+ private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
+ RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
+ new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+ try {
+ redisSink.open(new Configuration());
+ } catch (Exception e) {
+
+ // search for nested JedisConnectionExceptions
+ // because this is the expected behavior
+
+ Throwable t = e;
+ int depth = 0;
+ while (!(t instanceof JedisConnectionException)) {
+ t = t.getCause();
+ if (t == null || depth++ == 20) {
+ throw e;
+ }
+ }
+ }
+ }
+
private class TestMapper implements RedisMapper<Tuple2<String, String>>{
private RedisCommandDescription redisCommandDescription;