You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/28 01:32:31 UTC
[helix] branch metaclient updated: Lattice cache - caching just data implementation (#2619)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new c147eb4c0 Lattice cache - caching just data implementation (#2619)
c147eb4c0 is described below
commit c147eb4c0e03a13c99d324d549e389d3d689baa3
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Wed Sep 27 18:32:25 2023 -0700
Lattice cache - caching just data implementation (#2619)
Lattice cache - caching just data implementation
---------
Co-authored-by: mapeng <ma...@linkedin.com>
---
.../metaclient/api/MetaClientCacheInterface.java | 2 +-
.../factories/MetaClientCacheConfig.java | 5 -
.../metaclient/factories/MetaClientFactory.java | 25 ++--
.../metaclient/impl/zk/ZkMetaClientCache.java | 153 ++++++++++++++++++---
.../metaclient/impl/zk/TestZkMetaClientCache.java | 85 ++++++++++--
5 files changed, 220 insertions(+), 50 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
index 348bd0929..6449970bb 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -36,7 +36,7 @@ public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
private final String _nodeKey;
- TrieNode(String path, String nodeKey) {
+ public TrieNode(String path, String nodeKey) {
_path = path;
_nodeKey = nodeKey;
_children = new HashMap<>();
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 9e0323601..07972945a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -25,13 +25,11 @@ public class MetaClientCacheConfig {
private final String _rootEntry;
private boolean _cacheData = false;
private boolean _cacheChildren = false;
- private boolean _lazyCaching = true;
public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) {
_rootEntry = rootEntry;
_cacheData = cacheData;
_cacheChildren = cacheChildren;
- _lazyCaching = lazyCaching;
}
public String getRootEntry() {
@@ -46,7 +44,4 @@ public class MetaClientCacheConfig {
return _cacheChildren;
}
- public boolean getLazyCaching() {
- return _lazyCaching;
- }
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
index ebb4549da..7cc86a8a9 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
@@ -40,13 +40,7 @@ public class MetaClientFactory {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
- ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
- setConnectionAddress(config.getConnectionAddress())
- .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
- .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
- .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
- .build();
- return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+ return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
}
return null;
}
@@ -56,14 +50,17 @@ public class MetaClientFactory {
throw new IllegalArgumentException("MetaClientConfig cannot be null.");
}
if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
- ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
- setConnectionAddress(config.getConnectionAddress())
- .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
- .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
- .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
- .build();
- return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig);
+ return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
}
return null;
}
+
+ private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
+ return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+ setConnectionAddress(config.getConnectionAddress())
+ .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
+ .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
+ .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
+ .build();
+ }
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
index af1c9d791..66eb6f91e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -21,31 +21,38 @@ package org.apache.helix.metaclient.impl.zk;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.MetaClientCacheInterface;
-import org.apache.helix.metaclient.datamodel.DataRecord;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
-import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
-import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.Queue;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {
- private Map<String, DataRecord> _dataCacheMap;
+ private ConcurrentHashMap<String, T> _dataCacheMap;
private final String _rootEntry;
private TrieNode _childrenCacheTree;
private ChildChangeListener _eventListener;
private boolean _cacheData;
private boolean _cacheChildren;
- private boolean _lazyCaching;
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
private ZkClient _cacheClient;
+ private ExecutorService executor;
+
+ // TODO: Look into using conditional variable instead of latch.
+ private final CountDownLatch _initializedCache = new CountDownLatch(1);
/**
* Constructor for ZkMetaClientCache.
@@ -56,19 +63,43 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
super(config);
_cacheClient = getZkClient();
_rootEntry = cacheConfig.getRootEntry();
- _lazyCaching = cacheConfig.getLazyCaching();
_cacheData = cacheConfig.getCacheData();
_cacheChildren = cacheConfig.getCacheChildren();
+
+ if (_cacheData) {
+ _dataCacheMap = new ConcurrentHashMap<>();
+ }
+ if (_cacheChildren) {
+ _childrenCacheTree = new TrieNode(_rootEntry, null);
+ }
}
+ /**
+ * Get data for a given key.
+ * If datacache is enabled, will fetch for cache. If it doesn't exist
+ * returns null (for when initial populating cache is in progress).
+ * @param key key to identify the entry
+ * @return data for the key
+ */
@Override
- public Stat exists(String key) {
- throw new MetaClientException("Not implemented yet.");
+ public T get(final String key) {
+ if (_cacheData) {
+ T data = getDataCacheMap().get(key);
+ if (data == null) {
+ LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key);
+ }
+ return data;
+ }
+ return super.get(key);
}
@Override
- public T get(final String key) {
- throw new MetaClientException("Not implemented yet.");
+ public List<T> get(List<String> keys) {
+ List<T> dataList = new ArrayList<>();
+ for (String key : keys) {
+ dataList.add(get(key));
+ }
+ return dataList;
}
@Override
@@ -81,14 +112,100 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
throw new MetaClientException("Not implemented yet.");
}
- @Override
- public List<T> get(List<String> keys) {
- throw new MetaClientException("Not implemented yet.");
+ private void populateAllCache() {
+ // TODO: Concurrently populate children and data cache.
+ if (!_cacheClient.exists(_rootEntry)) {
+ LOG.warn("Root entry: {} does not exist.", _rootEntry);
+ // Let the other threads know that the cache is populated.
+ _initializedCache.countDown();
+ return;
+ }
+
+ Queue<String> queue = new ArrayDeque<>();
+ queue.add(_rootEntry);
+
+ while (!queue.isEmpty()) {
+ String node = queue.poll();
+ if (_cacheData) {
+ T dataRecord = _cacheClient.readData(node, true);
+ _dataCacheMap.put(node, dataRecord);
+ }
+ queue.addAll(_cacheClient.getChildren(node));
+ }
+ // Let the other threads know that the cache is populated.
+ _initializedCache.countDown();
}
- @Override
- public List<Stat> exists(List<String> keys) {
- throw new MetaClientException("Not implemented yet.");
+ private class CacheUpdateRunnable implements Runnable {
+ private final String path;
+ private final ChildChangeListener.ChangeType changeType;
+
+ public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) {
+ this.path = path;
+ this.changeType = changeType;
+ }
+
+ @Override
+ public void run() {
+ waitForPopulateAllCache();
+ // TODO: HANDLE DEDUP EVENT CHANGES
+ switch (changeType) {
+ case ENTRY_CREATED:
+ // Not implemented yet.
+ modifyDataInCache(path, false);
+ break;
+ case ENTRY_DELETED:
+ // Not implemented yet.
+ modifyDataInCache(path, true);
+ break;
+ case ENTRY_DATA_CHANGE:
+ modifyDataInCache(path, false);
+ break;
+ default:
+ LOG.error("Unknown change type: " + changeType);
+ }
+ }
+ }
+
+ private void waitForPopulateAllCache() {
+ try {
+ _initializedCache.await();
+ } catch (InterruptedException e) {
+ throw new MetaClientException("Interrupted while waiting for cache to populate.", e);
+ }
+ }
+
+ private void modifyDataInCache(String path, Boolean isDelete) {
+ if (_cacheData) {
+ if (isDelete) {
+ getDataCacheMap().remove(path);
+ } else {
+ T dataRecord = _cacheClient.readData(path, true);
+ getDataCacheMap().put(path, dataRecord);
+ }
+ }
}
+ public ConcurrentHashMap<String, T> getDataCacheMap() {
+ return _dataCacheMap;
+ }
+
+ public TrieNode getChildrenCacheTree() {
+ return _childrenCacheTree;
+ }
+
+ /**
+ * Connect to the underlying ZkClient.
+ */
+ @Override
+ public void connect() {
+ super.connect();
+ _eventListener = (path, changeType) -> {
+ Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType);
+ executor.execute(cacheUpdateRunnable);
+ };
+ executor = Executors.newSingleThreadExecutor();
+ _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
+ populateAllCache();
+ }
}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
index a3a5b4eee..30a0a729d 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -19,39 +19,100 @@ package org.apache.helix.metaclient.impl.zk;
* under the License.
*/
-
import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+
public class TestZkMetaClientCache extends ZkMetaClientTestBase {
- private static final String PATH = "/Cache";
+ private static final String DATA_PATH = "/data";
+ private static final String DATA_VALUE = "testData";
@Test
public void testCreateClient() {
- final String key = "/TestZkMetaClientCache_testCreate";
- try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) {
+ final String key = "/testCreate";
+ try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
zkMetaClientCache.connect();
// Perform some random non-read operation
zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+ }
+ }
+
+ @Test
+ public void testCacheDataUpdates() {
+ final String key = "/testCacheDataUpdates";
+ try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, "test");
+ zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+ // Get data for DATA_PATH and cache it
+ String data = zkMetaClientCache.get(key + DATA_PATH);
+ Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+
+ // Update data for DATA_PATH
+ String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1");
+
+ // Verify that cached data is updated. Might take some time
+ for (int i = 0; i < 10; i++) {
+ if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+
+ zkMetaClientCache.delete(key + DATA_PATH);
+ // Verify that cached data is updated. Might take some time
+ for (int i = 0; i < 10; i++) {
+ if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testBatchGet() {
+ final String key = "/testBatchGet";
+ try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
+ zkMetaClientCache.connect();
+ zkMetaClientCache.create(key, "test");
+ zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+ ArrayList<String> keys = new ArrayList<>();
+ keys.add(key);
+ keys.add(key + DATA_PATH);
+
+ ArrayList<String> values = new ArrayList<>();
+ values.add("test");
+ values.add(DATA_VALUE);
- try {
- //Perform some read operation - should fail.
- // TODO: Remove this once implemented.
- zkMetaClientCache.get(key);
- Assert.fail("Should have failed with non implemented yet.");
- } catch (Exception ignored) {
+ for (int i = 0; i < 10; i++) {
+ // Get data for DATA_PATH and cache it
+ List<String> data = zkMetaClientCache.get(keys);
+ if (data.equals(values)) {
+ break;
+ }
+ Thread.sleep(1000);
}
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
- protected static ZkMetaClientCache<String> createZkMetaClientCache() {
+ protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
ZkMetaClientConfig config =
new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
//.setZkSerializer(new TestStringSerializer())
.build();
- MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true);
+ MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
return new ZkMetaClientCache<>(config, cacheConfig);
}
}