You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/08/18 14:42:34 UTC

flink git commit: [hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.

Repository: flink
Updated Branches:
  refs/heads/master 47acdeadf -> e9a067229


[hotfix] Fix Redis Sink to fail at opening if Redis is not initialized.

This closes #2245


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9a06722
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9a06722
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9a06722

Branch: refs/heads/master
Commit: e9a067229d787c3e874dc40303ac8661045cc54f
Parents: 47acdea
Author: kl0u <kk...@gmail.com>
Authored: Wed Jul 13 18:40:17 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 18 16:41:53 2016 +0200

----------------------------------------------------------------------
 .../streaming/connectors/redis/RedisSink.java   |  8 +-
 .../common/container/RedisClusterContainer.java | 12 ++-
 .../container/RedisCommandsContainer.java       |  7 ++
 .../redis/common/container/RedisContainer.java  | 17 ++++-
 .../connectors/redis/RedisSinkITCase.java       |  2 -
 .../connectors/redis/RedisSinkTest.java         | 79 ++++++++++++++++++++
 6 files changed, 117 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
index 43518e8..f6b0fd7 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -166,7 +166,13 @@ public class RedisSink<IN> extends RichSinkFunction<IN> {
      */
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+		try {
+			this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+			this.redisCommandsContainer.open();
+		} catch (Exception e) {
+			LOG.error("Redis has not been properly initialized: ", e);
+			throw e;
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
index 7551c9e..d6621d6 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -33,7 +33,7 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
 
 	private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class);
 
-	private JedisCluster jedisCluster;
+	private transient JedisCluster jedisCluster;
 
 	/**
 	 * Initialize Redis command container for Redis cluster.
@@ -47,6 +47,16 @@ public class RedisClusterContainer implements RedisCommandsContainer, Closeable
 	}
 
 	@Override
+	public void open() throws Exception {
+
+		// echo() tries to open a connection and echos back the
+		// message passed as argument. Here we use it to monitor
+		// if we can communicate with the cluster.
+
+		jedisCluster.echo("Test");
+	}
+
+	@Override
 	public void hset(final String key, final String hashField, final String value) {
 		try {
 			jedisCluster.hset(key, hashField, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
index 1b92c2e..55dbfc2 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -25,6 +25,13 @@ import java.io.Serializable;
 public interface RedisCommandsContainer extends Serializable {
 
 	/**
+	 * Open the Jedis container.
+	 *
+	 * @throws Exception if the instance can not be opened properly
+	 */
+	void open() throws Exception;
+
+	/**
 	 * Sets field in the hash stored at key to value.
 	 * If key does not exist, a new key holding a hash is created.
 	 * If field already exists in the hash, it is overwritten.

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
index 8684e9a..ba4bbda 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -29,7 +29,7 @@ import java.io.IOException;
 /**
  * Redis command container if we want to connect to a single Redis server or to Redis sentinels
  * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}.
- * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
+ * If want to connect to a Redis sentinels, please use the second constructor {@link #RedisContainer(JedisSentinelPool)}
  */
 public class RedisContainer implements RedisCommandsContainer, Closeable {
 
@@ -37,9 +37,8 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
 
-	private final JedisPool jedisPool;
-	private final JedisSentinelPool jedisSentinelPool;
-
+	private transient JedisPool jedisPool;
+	private transient JedisSentinelPool jedisSentinelPool;
 
 	/**
 	 * Use this constructor if to connect with single Redis server.
@@ -77,6 +76,16 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
 	}
 
 	@Override
+	public void open() throws Exception {
+
+		// echo() tries to open a connection and echos back the
+		// message passed as argument. Here we use it to monitor
+		// if we can communicate with the cluster.
+
+		getInstance().echo("Test");
+	}
+
+	@Override
 	public void hset(final String key, final String hashField, final String value) {
 		Jedis jedis = null;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
index 237d9e5..21f3cca 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
 import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
@@ -31,7 +30,6 @@ import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class RedisSinkITCase extends RedisITCaseBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9a06722/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
index 848af57..59f59f2 100644
--- a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
@@ -17,12 +17,23 @@
 package org.apache.flink.streaming.connectors.redis;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
 import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
 
 public class RedisSinkTest extends TestLogger {
 
@@ -41,6 +52,74 @@ public class RedisSinkTest extends TestLogger {
 		new RedisSink<>(null, new TestMapper(new RedisCommandDescription(RedisCommand.LPUSH)));
 	}
 
+	@Test
+	public void testRedisDownBehavior() throws Exception {
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisPoolConfig wrongJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+			.setHost("127.0.0.1")
+			.setPort(1234).build();
+
+		testDownBehavior(wrongJedisPoolConfig);
+	}
+
+	@Test
+	public void testRedisClusterDownBehavior() throws Exception {
+
+		Set<InetSocketAddress> hosts = new HashSet<>();
+		hosts.add(new InetSocketAddress("127.0.0.1", 1234));
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisClusterConfig wrongJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
+			.setNodes(hosts)
+			.setTimeout(100)
+			.setMaxIdle(1)
+			.setMaxTotal(1)
+			.setMinIdle(1).build();
+
+		testDownBehavior(wrongJedisClusterConfig);
+	}
+
+	@Test
+	public void testRedisSentinelDownBehavior() throws Exception {
+
+		Set<String> hosts = new HashSet<>();
+		hosts.add("localhost:55095");
+
+		// create a wrong configuration so that open() fails.
+
+		FlinkJedisSentinelConfig wrongJedisSentinelConfig = new FlinkJedisSentinelConfig.Builder()
+			.setMasterName("master")
+			.setSentinels(hosts)
+			.build();
+
+		testDownBehavior(wrongJedisSentinelConfig);
+	}
+
+	private void testDownBehavior(FlinkJedisConfigBase config) throws Exception {
+		RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(config,
+			new RedisSinkITCase.RedisCommandMapper(RedisCommand.SADD));
+
+		try {
+			redisSink.open(new Configuration());
+		} catch (Exception e) {
+
+			// search for nested JedisConnectionExceptions
+			// because this is the expected behavior
+
+			Throwable t = e;
+			int depth = 0;
+			while (!(t instanceof JedisConnectionException)) {
+				t = t.getCause();
+				if (t == null || depth++ == 20) {
+					throw e;
+				}
+			}
+		}
+	}
+
 	private class TestMapper implements RedisMapper<Tuple2<String, String>>{
 		private RedisCommandDescription redisCommandDescription;