You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/12/06 22:31:37 UTC
nifi git commit: NIFI-5830 - RedisConnectionPoolService does not work
with Standalone Redis using non-localhost deployment
Repository: nifi
Updated Branches:
refs/heads/master f1e03b5ed -> 84c32f913
NIFI-5830 - RedisConnectionPoolService does not work with Standalone Redis using non-localhost deployment
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3176.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84c32f91
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84c32f91
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84c32f91
Branch: refs/heads/master
Commit: 84c32f913780d585ba16960c3f23e83313e1bcab
Parents: f1e03b5
Author: Alexander Bukarev <bu...@yandex.ru>
Authored: Tue Dec 4 22:15:37 2018 +0300
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Dec 6 23:31:25 2018 +0100
----------------------------------------------------------------------
.../org/apache/nifi/redis/util/RedisUtils.java | 60 ++++++++++----------
1 file changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/84c32f91/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
index aed823b..6489fcc 100644
--- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java
@@ -27,11 +27,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.util.StringUtils;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
+import org.springframework.data.redis.connection.RedisConfiguration;
+import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.JedisPoolConfig;
-import redis.clients.jedis.JedisShardInfo;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -264,19 +268,29 @@ public class RedisUtils {
final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
+ final JedisClientConfiguration jedisClientConfiguration = JedisClientConfiguration.builder()
+ .connectTimeout(Duration.ofMillis(timeout))
+ .readTimeout(Duration.ofMillis(timeout))
+ .usePooling()
+ .poolConfig(poolConfig)
+ .build();
JedisConnectionFactory connectionFactory;
if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
- final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString, timeout, password);
-
logger.info("Connecting to Redis in standalone mode at " + connectionString);
- connectionFactory = new JedisConnectionFactory(jedisShardInfo);
+ final String[] hostAndPortSplit = connectionString.split("[:]");
+ final String host = hostAndPortSplit[0].trim();
+ final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
+ final RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
+ enrichRedisConfiguration(redisStandaloneConfiguration, dbIndex, password);
+
+ connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration);
} else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
final String[] sentinels = connectionString.split("[,]");
final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels)));
- final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout, password);
+ enrichRedisConfiguration(sentinelConfiguration, dbIndex, password);
logger.info("Connecting to Redis in sentinel mode...");
logger.info("Redis master = " + sentinelMaster);
@@ -285,14 +299,14 @@ public class RedisUtils {
logger.info("Redis sentinel at " + sentinel);
}
- connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig);
- connectionFactory.setShardInfo(jedisShardInfo);
+ connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
} else {
final String[] clusterNodes = connectionString.split("[,]");
final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes));
+ enrichRedisConfiguration(clusterConfiguration, dbIndex, password);
clusterConfiguration.setMaxRedirects(maxRedirects);
logger.info("Connecting to Redis in clustered mode...");
@@ -300,16 +314,7 @@ public class RedisUtils {
logger.info("Redis cluster node at " + clusterNode);
}
- connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig);
- }
-
- connectionFactory.setUsePool(true);
- connectionFactory.setPoolConfig(poolConfig);
- connectionFactory.setDatabase(dbIndex);
- connectionFactory.setTimeout(timeout);
-
- if (!StringUtils.isBlank(password)) {
- connectionFactory.setPassword(password);
+ connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
}
// need to call this to initialize the pool/connections
@@ -325,20 +330,15 @@ public class RedisUtils {
return trimmedValues;
}
- private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer timeout, final String password) {
- final String[] hostAndPortSplit = hostAndPort.split("[:]");
- final String host = hostAndPortSplit[0].trim();
- final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
-
- final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port);
- jedisShardInfo.setConnectionTimeout(timeout);
- jedisShardInfo.setSoTimeout(timeout);
-
- if (!StringUtils.isEmpty(password)) {
- jedisShardInfo.setPassword(password);
+ private static void enrichRedisConfiguration(final RedisConfiguration redisConfiguration,
+ final Integer dbIndex,
+ final String password) {
+ if (redisConfiguration instanceof RedisConfiguration.WithDatabaseIndex) {
+ ((RedisConfiguration.WithDatabaseIndex) redisConfiguration).setDatabase(dbIndex);
+ }
+ if (redisConfiguration instanceof RedisConfiguration.WithPassword) {
+ ((RedisConfiguration.WithPassword) redisConfiguration).setPassword(RedisPassword.of(password));
}
-
- return jedisShardInfo;
}
private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {