You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/10/03 16:43:42 UTC
[pulsar] branch master updated: [pulsar-broker] Fix: invalidate
cache on zk-cache timeout (#5298)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dc95abf [pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)
dc95abf is described below
commit dc95abf4246c52d9ddb74e7d33a8074bbfc90786
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Oct 3 09:43:35 2019 -0700
[pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)
* [pulsar-broker] Fix: invalidate cache on zk-cache timeout
* add test
* fix test
---
.../pulsar/zookeeper/ZooKeeperDataCache.java | 8 ++-
.../pulsar/zookeeper/ZookeeperCacheTest.java | 65 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index 34a1545..b2d53d0 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
@@ -92,7 +93,12 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
* @throws Exception
*/
public Optional<T> get(final String path) throws Exception {
- return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
+ try {
+ return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
+ }catch(TimeoutException e) {
+ cache.asyncInvalidate(path);
+ throw e;
+ }
}
public Optional<Entry<T, Stat>> getWithStat(final String path) throws Exception {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 834844a..a83b370 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.zookeeper;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -32,6 +36,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -41,14 +46,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -492,4 +501,60 @@ public class ZookeeperCacheTest {
assertEquals(zkCache.getAsync(key1).get().get(), value);
zkExecutor.shutdown();
}
+
+ /**
+ * This tests verifies that {{@link ZooKeeperDataCache} invalidates the cache if the get-operation time-out on that
+ * path.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTimedOutZKCacheRequestInvalidates() throws Exception {
+
+ OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
+ ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
+ ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
+ MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()));
+
+ String path = "test";
+ doNothing().when(zkSession).getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+ zkClient.create("/test", new byte[0], null, null);
+
+ // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
+ // callback-result process
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkSession, 1, executor);
+ ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
+ @Override
+ public String deserialize(String key, byte[] content) throws Exception {
+ return new String(content);
+ }
+ };
+
+ // try to do get on the path which will time-out and async-cache will have non-completed Future
+ try {
+ zkCache.get(path);
+ } catch (Exception e) {
+ // Ok
+ }
+
+ retryStrategically((test) -> {
+ return zkCacheService.dataCache.getIfPresent(path) == null;
+ }, 5, 1000);
+
+ assertNull(zkCacheService.dataCache.getIfPresent(path));
+
+ executor.shutdown();
+ zkExecutor.shutdown();
+ scheduledExecutor.shutdown();
+ }
+
+ private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
+ throws Exception {
+ for (int i = 0; i < retryCount; i++) {
+ if (predicate.test(null) || i == (retryCount - 1)) {
+ break;
+ }
+ Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
+ }
+ }
}