You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/10/03 16:43:42 UTC

[pulsar] branch master updated: [pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dc95abf  [pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)
dc95abf is described below

commit dc95abf4246c52d9ddb74e7d33a8074bbfc90786
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Oct 3 09:43:35 2019 -0700

    [pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)
    
    * [pulsar-broker] Fix: invalidate cache on zk-cache timeout
    
    * add test
    
    * fix test
---
 .../pulsar/zookeeper/ZooKeeperDataCache.java       |  8 ++-
 .../pulsar/zookeeper/ZookeeperCacheTest.java       | 65 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index 34a1545..b2d53d0 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
@@ -92,7 +93,12 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
      * @throws Exception
      */
     public Optional<T> get(final String path) throws Exception {
-        return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
+        try {
+            return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);    
+        }catch(TimeoutException e) {
+            cache.asyncInvalidate(path);
+            throw e;
+        }
     }
 
     public Optional<Entry<T, Stat>> getWithStat(final String path) throws Exception {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 834844a..a83b370 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.zookeeper;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -32,6 +36,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.util.Collections;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -41,14 +46,18 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -492,4 +501,60 @@ public class ZookeeperCacheTest {
         assertEquals(zkCache.getAsync(key1).get().get(), value);
         zkExecutor.shutdown();
     }
+    
+    /**
+     * This tests verifies that {{@link ZooKeeperDataCache} invalidates the cache if the get-operation time-out on that
+     * path.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testTimedOutZKCacheRequestInvalidates() throws Exception {
+
+        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
+        ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
+        ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
+        MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()));
+
+        String path = "test";
+        doNothing().when(zkSession).getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+        zkClient.create("/test", new byte[0], null, null);
+
+        // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
+        // callback-result process
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkSession, 1, executor);
+        ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
+            @Override
+            public String deserialize(String key, byte[] content) throws Exception {
+                return new String(content);
+            }
+        };
+
+        // try to do get on the path which will time-out and async-cache will have non-completed Future
+        try {
+            zkCache.get(path);
+        } catch (Exception e) {
+            // Ok
+        }
+
+        retryStrategically((test) -> {
+            return zkCacheService.dataCache.getIfPresent(path) == null;
+        }, 5, 1000);
+
+        assertNull(zkCacheService.dataCache.getIfPresent(path));
+
+        executor.shutdown();
+        zkExecutor.shutdown();
+        scheduledExecutor.shutdown();
+    }
+
+    private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+            throws Exception {
+        for (int i = 0; i < retryCount; i++) {
+            if (predicate.test(null) || i == (retryCount - 1)) {
+                break;
+            }
+            Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+        }
+    }
 }