You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2021/08/31 00:00:15 UTC

[druid] branch master updated: Fix an exception when using redis cluster as cache (#11369)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7e5fee  Fix an exception when using redis cluster as cache (#11369)
c7e5fee is described below

commit c7e5fee4527c0d91b6fd12be58a67eb081984a44
Author: Frank Chen <fr...@outlook.com>
AuthorDate: Tue Aug 31 07:59:53 2021 +0800

    Fix an exception when using redis cluster as cache (#11369)
    
    * Redis mget problem in cluster mode
    
    * Format code
    
    * push down implementation of getBulk to sub-classes
    
    * Add tests
    
    * revert some changes
    
    * Fix intelllij inspections
    
    * Fix comments
    
    Signed-off-by: frank chen <fr...@outlook.com>
    
    * Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
    
    Co-authored-by: Benedict Jin <as...@apache.org>
    
    * Update extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
    
    Co-authored-by: Benedict Jin <as...@apache.org>
    
    * Update extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
    
    Co-authored-by: Benedict Jin <as...@apache.org>
    
    * returns empty map in case of internal exception
    
    Co-authored-by: Benedict Jin <as...@apache.org>
---
 .../druid/client/cache/AbstractRedisCache.java     | 31 +++++------
 .../druid/client/cache/RedisClusterCache.java      | 61 ++++++++++++++++++++--
 .../druid/client/cache/RedisStandaloneCache.java   | 23 ++++++--
 .../druid/client/cache/RedisClusterCacheTest.java  | 23 +++++---
 4 files changed, 107 insertions(+), 31 deletions(-)

diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
index 192e374..da11b76 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/AbstractRedisCache.java
@@ -19,15 +19,14 @@
 
 package org.apache.druid.client.cache;
 
-import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import redis.clients.jedis.exceptions.JedisException;
 
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -93,21 +92,12 @@ public abstract class AbstractRedisCache implements Cache
   public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
   {
     totalRequestCount.incrementAndGet();
-    Map<NamedKey, byte[]> results = new HashMap<>();
-
     try {
-      List<NamedKey> namedKeys = Lists.newArrayList(keys);
-      List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
-
-      List<byte[]> byteValues = this.mgetFromRedis(byteKeys.toArray(new byte[0][]));
-      for (int i = 0; i < byteValues.size(); ++i) {
-        if (byteValues.get(i) != null) {
-          results.put(namedKeys.get(i), byteValues.get(i));
-        }
-      }
+      Pair<Integer, Map<NamedKey, byte[]>> results = this.mgetFromRedis(keys);
 
-      hitCount.addAndGet(results.size());
-      missCount.addAndGet(namedKeys.size() - results.size());
+      hitCount.addAndGet(results.rhs.size());
+      missCount.addAndGet(results.lhs - results.rhs.size());
+      return results.rhs;
     }
     catch (JedisException e) {
       if (e.getMessage().contains("Read timed out")) {
@@ -116,9 +106,8 @@ public abstract class AbstractRedisCache implements Cache
         errorCount.incrementAndGet();
       }
       log.warn(e, "Exception pulling items from cache");
+      return Collections.emptyMap();
     }
-
-    return results;
   }
 
   @Override
@@ -172,7 +161,11 @@ public abstract class AbstractRedisCache implements Cache
 
   protected abstract void putToRedis(byte[] key, byte[] value, RedisCacheConfig.DurationConfig expiration);
 
-  protected abstract List<byte[]> mgetFromRedis(byte[]... keys);
+  /**
+   * The lhs of the returned pair is the count of input keys
+   * The rhs of the returned pair is a map holding the values of their corresponding keys
+   */
+  protected abstract Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys);
 
   protected abstract void cleanup();
 }
diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
index b71cce7..35827ba 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisClusterCache.java
@@ -19,14 +19,20 @@
 
 package org.apache.druid.client.cache;
 
+import org.apache.druid.java.util.common.Pair;
 import redis.clients.jedis.JedisCluster;
+import redis.clients.util.JedisClusterCRC16;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class RedisClusterCache extends AbstractRedisCache
 {
-  private JedisCluster cluster;
+  private final JedisCluster cluster;
 
   RedisClusterCache(JedisCluster cluster, RedisCacheConfig config)
   {
@@ -46,10 +52,59 @@ public class RedisClusterCache extends AbstractRedisCache
     cluster.setex(key, (int) expiration.getSeconds(), value);
   }
 
+  static class CachableKey
+  {
+    byte[] keyBytes;
+    NamedKey namedKey;
+
+    public CachableKey(NamedKey namedKey)
+    {
+      this.keyBytes = namedKey.toByteArray();
+      this.namedKey = namedKey;
+    }
+  }
+
+  /**
+   * Jedis does not work if the given keys are distributed among different redis nodes
+   * A simple workaround is to group keys by their slots and mget values for each slot.
+   * <p>
+   * In future, Jedis could be replaced by the Lettuce driver which supports mget operation on a redis cluster
+   */
   @Override
-  protected List<byte[]> mgetFromRedis(byte[]... keys)
+  protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
   {
-    return cluster.mget(keys);
+    int inputKeyCount = 0;
+
+    // group keys based on their slots
+    Map<Integer, List<CachableKey>> slot2Keys = new HashMap<>();
+    for (NamedKey key : keys) {
+      inputKeyCount++;
+
+      CachableKey cachableKey = new CachableKey(key);
+      int keySlot = JedisClusterCRC16.getSlot(cachableKey.keyBytes);
+      slot2Keys.computeIfAbsent(keySlot, val -> new ArrayList<>()).add(cachableKey);
+    }
+
+    ConcurrentHashMap<NamedKey, byte[]> results = new ConcurrentHashMap<>();
+    slot2Keys.keySet()
+             .parallelStream()
+             .forEach(slot -> {
+               List<CachableKey> keyList = slot2Keys.get(slot);
+
+               // mget for this slot
+               List<byte[]> values = cluster.mget(keyList.stream()
+                                                         .map(key -> key.keyBytes)
+                                                         .toArray(byte[][]::new));
+
+               for (int i = 0; i < keyList.size(); i++) {
+                 byte[] value = values.get(i);
+                 if (value != null) {
+                   results.put(keyList.get(i).namedKey, value);
+                 }
+               }
+             });
+
+    return new Pair<>(inputKeyCount, results);
   }
 
   @Override
diff --git a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
index d2e5c59..287d33c 100644
--- a/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
+++ b/extensions-contrib/redis-cache/src/main/java/org/apache/druid/client/cache/RedisStandaloneCache.java
@@ -19,14 +19,18 @@
 
 package org.apache.druid.client.cache;
 
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.Pair;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class RedisStandaloneCache extends AbstractRedisCache
 {
-  private JedisPool pool;
+  private final JedisPool pool;
 
   RedisStandaloneCache(JedisPool pool, RedisCacheConfig config)
   {
@@ -52,10 +56,23 @@ public class RedisStandaloneCache extends AbstractRedisCache
   }
 
   @Override
-  protected List<byte[]> mgetFromRedis(byte[]... keys)
+  protected Pair<Integer, Map<NamedKey, byte[]>> mgetFromRedis(Iterable<NamedKey> keys)
   {
+    List<NamedKey> namedKeys = Lists.newArrayList(keys);
+    List<byte[]> byteKeys = Lists.transform(namedKeys, NamedKey::toByteArray);
+
     try (Jedis jedis = pool.getResource()) {
-      return jedis.mget(keys);
+
+      List<byte[]> byteValues = jedis.mget(byteKeys.toArray(new byte[0][]));
+
+      Map<NamedKey, byte[]> results = new HashMap<>();
+      for (int i = 0; i < byteValues.size(); ++i) {
+        if (byteValues.get(i) != null) {
+          results.put(namedKeys.get(i), byteValues.get(i));
+        }
+      }
+
+      return new Pair<>(namedKeys.size(), results);
     }
   }
 
diff --git a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
index bc439b4..c8a24b1 100644
--- a/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
+++ b/extensions-contrib/redis-cache/src/test/java/org/apache/druid/client/cache/RedisClusterCacheTest.java
@@ -33,9 +33,10 @@ import redis.clients.jedis.JedisPoolConfig;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class RedisClusterCacheTest
 {
@@ -58,6 +59,7 @@ public class RedisClusterCacheTest
   };
 
   private RedisClusterCache cache;
+  private AtomicLong mgetCount = new AtomicLong();
 
   @Before
   public void setUp()
@@ -71,7 +73,7 @@ public class RedisClusterCacheTest
     // some methods must be overriden for test cases
     cache = new RedisClusterCache(new MockJedisCluster(Collections.singleton(new HostAndPort("localhost", 6379)))
     {
-      Map<String, byte[]> cacheStorage = new HashMap<>();
+      final ConcurrentHashMap<String, byte[]> cacheStorage = new ConcurrentHashMap<>();
 
       @Override
       public String setex(final byte[] key, final int seconds, final byte[] value)
@@ -89,13 +91,12 @@ public class RedisClusterCacheTest
       @Override
       public List<byte[]> mget(final byte[]... keys)
       {
+        mgetCount.incrementAndGet();
         List<byte[]> ret = new ArrayList<>();
         for (byte[] key : keys) {
           String k = StringUtils.encodeBase64String(key);
           byte[] value = cacheStorage.get(k);
-          if (value != null) {
-            ret.add(value);
-          }
+          ret.add(value);
         }
         return ret;
       }
@@ -122,6 +123,7 @@ public class RedisClusterCacheTest
     Cache.NamedKey key1 = new Cache.NamedKey("the", HI);
     Cache.NamedKey key2 = new Cache.NamedKey("the", HO);
     Cache.NamedKey key3 = new Cache.NamedKey("a", HI);
+    Cache.NamedKey notExist = new Cache.NamedKey("notExist", HI);
 
     //test put and get
     cache.put(key1, new byte[]{1, 2, 3, 4});
@@ -130,15 +132,24 @@ public class RedisClusterCacheTest
     Assert.assertEquals(0x01020304, Ints.fromByteArray(cache.get(key1)));
     Assert.assertEquals(0x02030405, Ints.fromByteArray(cache.get(key2)));
     Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
+    Assert.assertEquals(0x03040506, Ints.fromByteArray(cache.get(key3)));
+    Assert.assertNull(cache.get(notExist));
+
+    this.mgetCount.set(0);
 
     //test multi get
     Map<Cache.NamedKey, byte[]> result = cache.getBulk(
         Lists.newArrayList(
             key1,
             key2,
-            key3
+            key3,
+            notExist
         )
     );
+
+    // these 4 keys are distributed among different nodes, so there should be 4 times call of MGET
+    Assert.assertEquals(mgetCount.get(), 4);
+    Assert.assertEquals(result.size(), 3);
     Assert.assertEquals(0x01020304, Ints.fromByteArray(result.get(key1)));
     Assert.assertEquals(0x02030405, Ints.fromByteArray(result.get(key2)));
     Assert.assertEquals(0x03040506, Ints.fromByteArray(result.get(key3)));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org