You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/08/31 00:00:15 UTC
[druid] branch master updated: Fix an exception when using redis
cluster as cache (#11369)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c7e5fee Fix an exception when using redis cluster as cache (#11369)
c7e5fee is described below
commit c7e5fee4527c0d91b6fd12be58a67eb081984a44
Author: Frank Chen <fr...@outlook.com>
AuthorDate: Tue Aug 31 07:59:53 2021 +0800
Fix an exception when using redis cluster as cache (#11369)
* Redis mget problem in cluster mode
* Format code
* push down implementation of getBulk to sub-classes
* Add tests
* revert some changes
* Fix intelllij inspections
* Fix comments
Signed-off-by: frank chen <fr...@outlook.com>
* Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
Co-authored-by: Benedict Jin <as...@apache.org>
* Update extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
Co-authored-by: Benedict Jin <as...@apache.org>
* Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
Co-authored-by: Benedict Jin <as...@apache.org>
* returns empty map in case of internal exception
Co-authored-by: Benedict Jin <as...@apache.org>
---
.../druid/client/cache/AbstractRedisCache.java | 31 +++++------
.../druid/client/cache/RedisClusterCache.java | 61 ++++++++++++++++++++--
.../druid/client/cache/RedisStandaloneCache.java | 23 ++++++--
.../druid/client/cache/RedisClusterCacheTest.java | 23 +++++---
4 files changed, 107 insertions(+), 31 deletions(-)
diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
index 192e374..da11b76 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
@@ -19,15 +19,14 @@
package org.apache.druid.client.cache;
-import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import redis.clients.jedis.exceptions.JedisException;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -93,21 +92,12 @@ public abstract class AbstractRedisCache implements Cache
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
totalRequestCount.incrementAndGet();
- Map<NamedKey, byte[]> results = new HashMap<>();
-
try {
- List<NamedKey> namedKeys = Lists.newArrayList(keys);
- List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
-
- List<byte[]> byteValues = this.mgetFromRedis(byteKeys.toArray(new byte[0][]));
- for (int i = 0; i < byteValues.size(); ++i) {
- if (byteValues.get(i) != null) {
- results.put(namedKeys.get(i), byteValues.get(i));
- }
- }
+ Pair<Integer, Map<NamedKey, byte[]>> results = this.mgetFromRedis(keys);
- hitCount.addAndGet(results.size());
- missCount.addAndGet(namedKeys.size() - results.size());
+ hitCount.addAndGet(results.rhs.size());
+ missCount.addAndGet(results.lhs - results.rhs.size());
+ return results.rhs;
}
catch (JedisException e) {
if (e.getMessage().contains("Read timed out")) {
@@ -116,9 +106,8 @@ public abstract class AbstractRedisCache implements Cache
errorCount.incrementAndGet();
}
log.warn(e, "Exception pulling items from cache");
+ return Collections.emptyMap();
}
-
- return results;
}
@Override
@@ -172,7 +161,11 @@ public abstract class AbstractRedisCache implements Cache
protected abstract void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration);
- protected abstract List<byte[]> mgetFromRedis(byte[]... keys);
+ /**
+ * The lhs of the returned pair is the count of input keys
+ * The rhs of the returned pair is a map holding the values of their corresponding keys
+ */
+ protected abstract Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys);
protected abstract void cleanup();
}
diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
index b71cce7..35827ba 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
@@ -19,14 +19,20 @@
package org.apache.druid.client.cache;
+import org.apache.druid.java.util.common.Pair;
import redis.clients.jedis.JedisCluster;
+import redis.clients.util.JedisClusterCRC16;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class RedisClusterCache extends AbstractRedisCache
{
- private JedisCluster cluster;
+ private final JedisCluster cluster;
RedisClusterCache(JedisCluster cluster, RedisCacheConfig config)
{
@@ -46,10 +52,59 @@ public class RedisClusterCache extends AbstractRedisCache
cluster.setex(key, (int) expiration.getSeconds(), value);
}
+ static class CachableKey
+ {
+ byte[] keyBytes;
+ NamedKey namedKey;
+
+ public CachableKey(NamedKey namedKey)
+ {
+ this.keyBytes = namedKey.toByteArray();
+ this.namedKey = namedKey;
+ }
+ }
+
+ /**
+ * Jedis does not work if the given keys are distributed among different redis nodes
+ * A simple workaround is to group keys by their slots and mget values for each slot.
+ * <p>
+ * In future, Jedis could be replaced by the Lettuce driver which supports mget operation on a redis cluster
+ */
@Override
- protected List<byte[]> mgetFromRedis(byte[]... keys)
+ protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
{
- return cluster.mget(keys);
+ int inputKeyCount = 0;
+
+ // group keys based on their slots
+ Map<Integer, List<CachableKey>> slot2Keys = new HashMap<>();
+ for (NamedKey key : keys) {
+ inputKeyCount++;
+
+ CachableKey cachableKey = new CachableKey(key);
+ int keySlot = JedisClusterCRC16.getSlot(cachableKey.keyBytes);
+ slot2Keys.computeIfAbsent(keySlot, val -> new ArrayList<>()).add(cachableKey);
+ }
+
+ ConcurrentHashMap<NamedKey, byte[]> results = new ConcurrentHashMap<>();
+ slot2Keys.keySet()
+ .parallelStream()
+ .forEach(slot -> {
+ List<CachableKey> keyList = slot2Keys.get(slot);
+
+ // mget for this slot
+ List<byte[]> values = cluster.mget(keyList.stream()
+ .map(key -> key.keyBytes)
+ .toArray(byte[][]::new));
+
+ for (int i = 0; i < keyList.size(); i++) {
+ byte[] value = values.get(i);
+ if (value != null) {
+ results.put(keyList.get(i).namedKey, value);
+ }
+ }
+ });
+
+ return new Pair<>(inputKeyCount, results);
}
@Override
diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
index d2e5c59..287d33c 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
@@ -19,14 +19,18 @@
package org.apache.druid.client.cache;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.Pair;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class RedisStandaloneCache extends AbstractRedisCache
{
- private JedisPool pool;
+ private final JedisPool pool;
RedisStandaloneCache(JedisPool pool, RedisCacheConfig config)
{
@@ -52,10 +56,23 @@ public class RedisStandaloneCache extends AbstractRedisCache
}
@Override
- protected List<byte[]> mgetFromRedis(byte[]... keys)
+ protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
{
+ List<NamedKey> namedKeys = Lists.newArrayList(keys);
+ List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
+
try (Jedis jedis = pool.getResource()) {
- return jedis.mget(keys);
+
+ List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
+
+ Map<NamedKey, byte[]> results = new HashMap<>();
+ for (int i = 0; i < byteValues.size(); ++i) {
+ if (byteValues.get(i) != null) {
+ results.put(namedKeys.get(i), byteValues.get(i));
+ }
+ }
+
+ return new Pair<>(namedKeys.size(), results);
}
}
diff --git a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
index bc439b4..c8a24b1 100644
--- a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
+++ b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
@@ -33,9 +33,10 @@ import redis.clients.jedis.JedisPoolConfig;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
public class RedisClusterCacheTest
{
@@ -58,6 +59,7 @@ public class RedisClusterCacheTest
};
private RedisClusterCache cache;
+ private AtomicLong mgetCount = new AtomicLong();
@Before
public void setUp()
@@ -71,7 +73,7 @@ public class RedisClusterCacheTest
// some methods must be overriden for test cases
cache = new RedisClusterCache(new MockJedisCluster(Collections.singleton(new HostAndPort("localhost", 6379)))
{
- Map<String, byte[]> cacheStorage = new HashMap<>();
+ final ConcurrentHashMap<String, byte[]> cacheStorage = new ConcurrentHashMap<>();
@Override
public String setex(final byte[] key, final int seconds, final byte[] value)
@@ -89,13 +91,12 @@ public class RedisClusterCacheTest
@Override
public List<byte[]> mget(final byte[]... keys)
{
+ mgetCount.incrementAndGet();
List<byte[]> ret = new ArrayList<>();
for (byte[] key : keys) {
String k = StringUtils.encodeBase64String(key);
byte[] value = cacheStorage.get(k);
- if (value != null) {
- ret.add(value);
- }
+ ret.add(value);
}
return ret;
}
@@ -122,6 +123,7 @@ public class RedisClusterCacheTest
Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
+ Cache.NamedKey notExist = new Cache.NamedKey("notExist", HI);
//test put and get
cache.put(key1, new byte[]{1, 2, 3, 4});
@@ -130,15 +132,24 @@ public class RedisClusterCacheTest
Assert.assertEquals(0x01020304, Ints.fromByteArray(cache.get(key1)));
Assert.assertEquals(0x02030405, Ints.fromByteArray(cache.get(key2)));
Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
+ Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
+ Assert.assertNull(cache.get(notExist));
+
+ this.mgetCount.set(0);
//test multi get
Map<Cache.NamedKey, byte[]> result = cache.getBulk(
Lists.newArrayList(
key1,
key2,
- key3
+ key3,
+ notExist
)
);
+
+ // these 4 keys are distributed among different nodes, so there should be 4 times call of MGET
+ Assert.assertEquals(mgetCount.get(), 4);
+ Assert.assertEquals(result.size(), 3);
Assert.assertEquals(0x01020304, Ints.fromByteArray(result.get(key1)));
Assert.assertEquals(0x02030405, Ints.fromByteArray(result.get(key2)));
Assert.assertEquals(0x03040506, Ints.fromByteArray(result.get(key3)));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org