You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/28 01:32:31 UTC

[helix] branch metaclient updated: Lattice cache - caching just data implementation (#2619)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new c147eb4c0 Lattice cache - caching just data implementation (#2619)
c147eb4c0 is described below

commit c147eb4c0e03a13c99d324d549e389d3d689baa3
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Wed Sep 27 18:32:25 2023 -0700

    Lattice cache - caching just data implementation (#2619)
    
    Lattice cache - caching just data implementation
    ---------
    
    Co-authored-by: mapeng <ma...@linkedin.com>
---
 .../metaclient/api/MetaClientCacheInterface.java   |   2 +-
 .../factories/MetaClientCacheConfig.java           |   5 -
 .../metaclient/factories/MetaClientFactory.java    |  25 ++--
 .../metaclient/impl/zk/ZkMetaClientCache.java      | 153 ++++++++++++++++++---
 .../metaclient/impl/zk/TestZkMetaClientCache.java  |  85 ++++++++++--
 5 files changed, 220 insertions(+), 50 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
index 348bd0929..6449970bb 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientCacheInterface.java
@@ -36,7 +36,7 @@ public interface MetaClientCacheInterface<T> extends MetaClientInterface<T> {
 
         private final String _nodeKey;
 
-        TrieNode(String path, String nodeKey) {
+        public TrieNode(String path, String nodeKey) {
             _path = path;
             _nodeKey = nodeKey;
             _children = new HashMap<>();
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
index 9e0323601..07972945a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientCacheConfig.java
@@ -25,13 +25,11 @@ public class MetaClientCacheConfig {
     private final String _rootEntry;
     private boolean _cacheData = false;
     private boolean _cacheChildren = false;
-    private boolean _lazyCaching = true;
 
     public MetaClientCacheConfig(String rootEntry, boolean cacheData, boolean cacheChildren, boolean lazyCaching) {
         _rootEntry = rootEntry;
         _cacheData = cacheData;
         _cacheChildren = cacheChildren;
-        _lazyCaching = lazyCaching;
     }
 
     public String getRootEntry() {
@@ -46,7 +44,4 @@ public class MetaClientCacheConfig {
         return _cacheChildren;
     }
 
-    public boolean getLazyCaching() {
-        return _lazyCaching;
-    }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
index ebb4549da..7cc86a8a9 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientFactory.java
@@ -40,13 +40,7 @@ public class MetaClientFactory {
       throw new IllegalArgumentException("MetaClientConfig cannot be null.");
     }
     if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
-      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
-          setConnectionAddress(config.getConnectionAddress())
-          .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
-          .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
-          .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
-          .build();
-      return new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      return new ZkMetaClientFactory().getMetaClient(createZkMetaClientConfig(config));
     }
     return null;
   }
@@ -56,14 +50,17 @@ public class MetaClientFactory {
       throw new IllegalArgumentException("MetaClientConfig cannot be null.");
     }
     if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
-      ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
-              setConnectionAddress(config.getConnectionAddress())
-              .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
-              .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
-              .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
-              .build();
-      return new ZkMetaClientFactory().getMetaClientCache(zkMetaClientConfig, cacheConfig);
+      return new ZkMetaClientFactory().getMetaClientCache(createZkMetaClientConfig(config), cacheConfig);
     }
       return null;
   }
+
+  private ZkMetaClientConfig createZkMetaClientConfig(MetaClientConfig config) {
+      return new ZkMetaClientConfig.ZkMetaClientConfigBuilder().
+        setConnectionAddress(config.getConnectionAddress())
+        .setMetaClientReconnectPolicy(config.getMetaClientReconnectPolicy())
+        .setConnectionInitTimeoutInMillis(config.getConnectionInitTimeoutInMillis())
+        .setSessionTimeoutInMillis(config.getSessionTimeoutInMillis())
+        .build();
+  }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
index af1c9d791..66eb6f91e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientCache.java
@@ -21,31 +21,38 @@ package org.apache.helix.metaclient.impl.zk;
 
 import org.apache.helix.metaclient.api.ChildChangeListener;
 import org.apache.helix.metaclient.api.MetaClientCacheInterface;
-import org.apache.helix.metaclient.datamodel.DataRecord;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
-import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
-import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
-import org.apache.helix.metaclient.recipes.lock.LockInfoSerializer;
 import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Queue;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientCacheInterface<T> {
 
-    private Map<String, DataRecord> _dataCacheMap;
+    private ConcurrentHashMap<String, T> _dataCacheMap;
     private final String _rootEntry;
     private TrieNode _childrenCacheTree;
     private ChildChangeListener _eventListener;
     private boolean _cacheData;
     private boolean _cacheChildren;
-    private boolean _lazyCaching;
     private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClientCache.class);
     private  ZkClient _cacheClient;
+    private ExecutorService executor;
+
+    // TODO: Look into using conditional variable instead of latch.
+    private final CountDownLatch _initializedCache = new CountDownLatch(1);
 
     /**
      * Constructor for ZkMetaClientCache.
@@ -56,19 +63,43 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
         super(config);
         _cacheClient = getZkClient();
         _rootEntry = cacheConfig.getRootEntry();
-        _lazyCaching = cacheConfig.getLazyCaching();
         _cacheData = cacheConfig.getCacheData();
         _cacheChildren = cacheConfig.getCacheChildren();
+
+        if (_cacheData) {
+            _dataCacheMap = new ConcurrentHashMap<>();
+        }
+        if (_cacheChildren) {
+            _childrenCacheTree = new TrieNode(_rootEntry, null);
+        }
     }
 
+    /**
+     * Get data for a given key.
+     * If datacache is enabled, will fetch for cache. If it doesn't exist
+     * returns null (for when initial populating cache is in progress).
+     * @param key key to identify the entry
+     * @return data for the key
+     */
     @Override
-    public Stat exists(String key) {
-        throw new MetaClientException("Not implemented yet.");
+    public T get(final String key) {
+        if (_cacheData) {
+            T data = getDataCacheMap().get(key);
+            if (data == null) {
+                LOG.debug("Data not found in cache for key: {}. This could be because the cache is still being populated.", key);
+            }
+            return data;
+        }
+        return super.get(key);
     }
 
     @Override
-    public T get(final String key) {
-        throw new MetaClientException("Not implemented yet.");
+    public List<T> get(List<String> keys) {
+        List<T> dataList = new ArrayList<>();
+        for (String key : keys) {
+            dataList.add(get(key));
+        }
+        return dataList;
     }
 
     @Override
@@ -81,14 +112,100 @@ public class ZkMetaClientCache<T> extends ZkMetaClient<T> implements MetaClientC
         throw new MetaClientException("Not implemented yet.");
     }
 
-    @Override
-    public List<T> get(List<String> keys) {
-        throw new MetaClientException("Not implemented yet.");
+    private void populateAllCache() {
+        // TODO: Concurrently populate children and data cache.
+        if (!_cacheClient.exists(_rootEntry)) {
+            LOG.warn("Root entry: {} does not exist.", _rootEntry);
+            // Let the other threads know that the cache is populated.
+            _initializedCache.countDown();
+            return;
+        }
+
+        Queue<String> queue = new ArrayDeque<>();
+        queue.add(_rootEntry);
+
+        while (!queue.isEmpty()) {
+            String node = queue.poll();
+            if (_cacheData) {
+                T dataRecord = _cacheClient.readData(node, true);
+                _dataCacheMap.put(node, dataRecord);
+            }
+            queue.addAll(_cacheClient.getChildren(node));
+        }
+        // Let the other threads know that the cache is populated.
+        _initializedCache.countDown();
     }
 
-    @Override
-    public List<Stat> exists(List<String> keys) {
-        throw new MetaClientException("Not implemented yet.");
+    private class CacheUpdateRunnable implements Runnable {
+        private final String path;
+        private final ChildChangeListener.ChangeType changeType;
+
+        public CacheUpdateRunnable(String path, ChildChangeListener.ChangeType changeType) {
+            this.path = path;
+            this.changeType = changeType;
+        }
+
+        @Override
+        public void run() {
+            waitForPopulateAllCache();
+            //  TODO: HANDLE DEDUP EVENT CHANGES
+            switch (changeType) {
+                case ENTRY_CREATED:
+                    // Not implemented yet.
+                    modifyDataInCache(path, false);
+                    break;
+                case ENTRY_DELETED:
+                    // Not implemented yet.
+                    modifyDataInCache(path, true);
+                    break;
+                case ENTRY_DATA_CHANGE:
+                    modifyDataInCache(path, false);
+                    break;
+                default:
+                    LOG.error("Unknown change type: " + changeType);
+            }
+        }
+    }
+
+    private void waitForPopulateAllCache() {
+        try {
+            _initializedCache.await();
+        } catch (InterruptedException e) {
+            throw new MetaClientException("Interrupted while waiting for cache to populate.", e);
+        }
+    }
+
+    private void modifyDataInCache(String path, Boolean isDelete) {
+        if (_cacheData) {
+            if (isDelete) {
+                getDataCacheMap().remove(path);
+            } else {
+                T dataRecord = _cacheClient.readData(path, true);
+                getDataCacheMap().put(path, dataRecord);
+            }
+        }
     }
 
+    public ConcurrentHashMap<String, T> getDataCacheMap() {
+        return _dataCacheMap;
+    }
+
+    public TrieNode getChildrenCacheTree() {
+        return _childrenCacheTree;
+    }
+
+    /**
+     * Connect to the underlying ZkClient.
+     */
+    @Override
+    public void connect() {
+        super.connect();
+        _eventListener = (path, changeType) -> {
+            Runnable cacheUpdateRunnable = new CacheUpdateRunnable(path, changeType);
+            executor.execute(cacheUpdateRunnable);
+        };
+        executor = Executors.newSingleThreadExecutor();
+        _cacheClient.subscribePersistRecursiveListener(_rootEntry, new ChildListenerAdapter(_eventListener));
+        populateAllCache();
+    }
 }
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
index a3a5b4eee..30a0a729d 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientCache.java
@@ -19,39 +19,100 @@ package org.apache.helix.metaclient.impl.zk;
  * under the License.
  */
 
-
 import org.apache.helix.metaclient.factories.MetaClientCacheConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class TestZkMetaClientCache extends ZkMetaClientTestBase {
-    private static final String PATH = "/Cache";
+    private static final String DATA_PATH = "/data";
+    private static final String DATA_VALUE = "testData";
 
     @Test
     public void testCreateClient() {
-        final String key = "/TestZkMetaClientCache_testCreate";
-        try (ZkMetaClient<String> zkMetaClientCache = createZkMetaClientCache()) {
+        final String key = "/testCreate";
+        try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
             zkMetaClientCache.connect();
             // Perform some random non-read operation
             zkMetaClientCache.create(key, ENTRY_STRING_VALUE);
+        }
+    }
+
+    @Test
+    public void testCacheDataUpdates() {
+        final String key = "/testCacheDataUpdates";
+        try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, "test");
+            zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+            // Get data for DATA_PATH and cache it
+            String data = zkMetaClientCache.get(key + DATA_PATH);
+            Assert.assertEquals(data, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+
+            // Update data for DATA_PATH
+            String newData = zkMetaClientCache.update(key + DATA_PATH, currentData -> currentData + "1");
+
+            // Verify that cached data is updated. Might take some time
+            for (int i = 0; i < 10; i++) {
+                if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH).equals(newData)) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+            Assert.assertEquals(newData, zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH));
+
+            zkMetaClientCache.delete(key + DATA_PATH);
+            // Verify that cached data is updated. Might take some time
+            for (int i = 0; i < 10; i++) {
+                if (zkMetaClientCache.getDataCacheMap().get(key + DATA_PATH) == null) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testBatchGet() {
+        final String key = "/testBatchGet";
+        try (ZkMetaClientCache<String> zkMetaClientCache = createZkMetaClientCacheLazyCaching(key)) {
+            zkMetaClientCache.connect();
+            zkMetaClientCache.create(key, "test");
+            zkMetaClientCache.create(key + DATA_PATH, DATA_VALUE);
+
+            ArrayList<String> keys = new ArrayList<>();
+            keys.add(key);
+            keys.add(key + DATA_PATH);
+
+            ArrayList<String> values = new ArrayList<>();
+            values.add("test");
+            values.add(DATA_VALUE);
 
-            try {
-                //Perform some read operation - should fail.
-                // TODO: Remove this once implemented.
-                zkMetaClientCache.get(key);
-                Assert.fail("Should have failed with non implemented yet.");
-            } catch (Exception ignored) {
+            for (int i = 0; i < 10; i++) {
+                // Get data for DATA_PATH and cache it
+                List<String> data = zkMetaClientCache.get(keys);
+                if (data.equals(values)) {
+                    break;
+                }
+                Thread.sleep(1000);
             }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    protected static ZkMetaClientCache<String> createZkMetaClientCache() {
+    protected static ZkMetaClientCache<String> createZkMetaClientCacheLazyCaching(String rootPath) {
         ZkMetaClientConfig config =
                 new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
                         //.setZkSerializer(new TestStringSerializer())
                         .build();
-        MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(PATH, true, true, true);
+        MetaClientCacheConfig cacheConfig = new MetaClientCacheConfig(rootPath, true, true, true);
         return new ZkMetaClientCache<>(config, cacheConfig);
     }
 }