You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:14:06 UTC

[helix] 11/12: Implement throttling for routing data update on cache miss

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8c2059c78130f8138d6295bef27d76ac5a413aaf
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Jul 27 18:36:39 2020 -0700

    Implement throttling for routing data update on cache miss
    
    This commit implements throttling for routing data update by using a timestamp for last time the cache was reset in RoutingDataManager. It defines a default interval (5 seconds) but makes this interval configurable by way of System Properties config.
---
 ...PropertyKeys.java => RoutingDataConstants.java} | 14 ++---
 .../constant/RoutingSystemPropertyKeys.java        |  5 ++
 .../zookeeper/impl/client/FederatedZkClient.java   | 38 ++++++++++++
 .../zookeeper/routing/RoutingDataManager.java      | 12 ++++
 .../impl/client/TestFederatedZkClient.java         | 70 +++++++++++++++++++++-
 5 files changed, 128 insertions(+), 11 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
similarity index 66%
copy from zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
copy to zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
index a57075b..164c543 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
@@ -19,15 +19,13 @@ package org.apache.helix.zookeeper.constant;
  * under the License.
  */
 
-/**
- * This class contains various routing-related system property keys for multi-zk clients.
- */
-public class RoutingSystemPropertyKeys {
+public class RoutingDataConstants {
 
   /**
-   * If enabled, FederatedZkClient (multiZkClient) will invalidate the cached routing data and
-   * re-read the routing data from the routing data source upon ZK path sharding key cache miss.
+   * Default interval that defines how frequently RoutingDataManager's routing data should be
+   * updated from the routing data source. This exists to apply throttling to the rate at which
+   * the ZkClient pulls routing data from the routing data source to avoid overloading the routing
+   * data source.
    */
-  public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
-      "update.routing.data.on.cache.miss.enabled";
+  public static final long DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS = 5 * 1000L; // 5 seconds
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
index a57075b..e22ad08 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
@@ -30,4 +30,9 @@ public class RoutingSystemPropertyKeys {
    */
   public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
       "update.routing.data.on.cache.miss.enabled";
+
+  /**
+   * The interval to use between routing data updates from the routing data source.
+   */
+  public static final String ROUTING_DATA_UPDATE_INTERVAL_MS = "routing.data.update.interval.ms";
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 4354537..dc55d53 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -30,6 +30,7 @@ import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.RoutingDataConstants;
 import org.apache.helix.zookeeper.constant.RoutingSystemPropertyKeys;
 import org.apache.helix.zookeeper.exception.MultiZkException;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -86,6 +87,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private final boolean _routingDataUpdateOnCacheMissEnabled = Boolean.parseBoolean(
       System.getProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS));
+  private long _routingDataUpdateInterval;
 
   // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
   public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
@@ -102,6 +104,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
     _clientConfig = clientConfig;
     _pathBasedZkSerializer = clientConfig.getZkSerializer();
     _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+    getRoutingDataUpdateInterval();
   }
 
   @Override
@@ -587,6 +590,11 @@ public class FederatedZkClient implements RealmAwareZkClient {
               try {
                 zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
               } catch (NoSuchElementException e4) {
+                if (shouldThrottleRead()) {
+                  // If routing data update from routing data source has taken place recently,
+                  // then just skip the update and throw the exception
+                  throw e4;
+                }
                 // Try 2) Reset RoutingDataManager and re-read the routing data from routing data
                 // source via I/O. Since RoutingDataManager's cache doesn't have it either, so we
                 // synchronize on all threads by locking on FederatedZkClient.class.
@@ -626,4 +634,34 @@ public class FederatedZkClient implements RealmAwareZkClient {
             + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
             + " to create a dedicated RealmAwareZkClient for this operation.");
   }
+
+  /**
+   * Resolves the routing data update interval value from System Properties.
+   */
+  private void getRoutingDataUpdateInterval() {
+    try {
+      _routingDataUpdateInterval = Long.parseLong(
+          System.getProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS));
+      if (_routingDataUpdateInterval < 0) {
+        LOG.warn("FederatedZkClient::shouldThrottleRead(): invalid value: {} given for "
+                + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!",
+            _routingDataUpdateInterval);
+        _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("FederatedZkClient::shouldThrottleRead(): failed to parse "
+          + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!", e);
+      _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+    }
+  }
+
+  /**
+   * Return whether the read request to routing data source should be throttled using the default
+   * routing data update interval.
+   * @return
+   */
+  private boolean shouldThrottleRead() {
+    return System.currentTimeMillis() - RoutingDataManager.getInstance().getLastResetTimestamp()
+        < _routingDataUpdateInterval;
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
index 6df9616..853bd5c 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
@@ -51,6 +51,9 @@ public class RoutingDataManager {
   private final Map<String, MetadataStoreRoutingData> _metadataStoreRoutingDataMap =
       new ConcurrentHashMap<>();
 
+  // Tracks the time at which reset() was called last. Used to throttle reset()
+  private volatile long _lastResetTimestamp;
+
   // Singleton instance
   private static RoutingDataManager _instance;
 
@@ -164,6 +167,15 @@ public class RoutingDataManager {
     _metadataStoreRoutingDataMap.clear();
     _defaultMsdsEndpoint =
         System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    _lastResetTimestamp = System.currentTimeMillis();
+  }
+
+  /**
+   * Returns the timestamp for the last reset().
+   * @return
+   */
+  public long getLastResetTimestamp() {
+    return _lastResetTimestamp;
   }
 
   /**
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index 93e5892..e201905 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -279,6 +279,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
       throws IOException, InvalidRoutingDataException {
     // Enable routing data update upon cache miss
     System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+    // Set the routing data update interval to 0 so there's no delay in testing
+    System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0");
 
     RoutingDataManager.getInstance().getMetadataStoreRoutingData();
     _msdsServer.stopServer();
@@ -375,7 +377,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
     // Shut down MSDS
     _msdsServer.stopServer();
     // Disable System property
-    System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false");
+    System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
   }
 
   /**
@@ -402,6 +405,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
 
     // Enable routing data update upon cache miss
     System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+    // Set the routing data update interval to 0 so there's no delay in testing
+    System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0");
 
     RoutingDataManager.getInstance().reset();
     RoutingDataManager.getInstance().getMetadataStoreRoutingData(RoutingDataReaderType.ZK, zkRealm);
@@ -496,7 +501,66 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
     zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
     zkClient.close();
     // Disable System property
-    System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false");
+    System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
+  }
+
+  /**
+   * Test that throttle based on last reset timestamp works correctly. Here, we use ZK as the
+   * routing data source.
+   * Test scenario: set the throttle value to a high value and check that routing data update from
+   * the routing data source does NOT happen (because it would be throttled).
+   */
+  @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+  public void testRoutingDataUpdateThrottle() throws InvalidRoutingDataException {
+    // Call reset to set the last reset() timestamp in RoutingDataManager
+    RoutingDataManager.getInstance().reset();
+
+    // Set up routing data in ZK with empty sharding key list
+    String zkRealm = "localhost:2127";
+    String newShardingKey = "/throttle";
+    ZkClient zkClient =
+        new ZkClient.Builder().setZkServer(zkRealm).setZkSerializer(new ZNRecordSerializer())
+            .build();
+    zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, CreateMode.PERSISTENT);
+    ZNRecord zkRealmRecord = new ZNRecord(zkRealm);
+    zkRealmRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+        new ArrayList<>(TestConstants.TEST_KEY_LIST_1));
+    zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord,
+        CreateMode.PERSISTENT);
+
+    // Enable routing data update upon cache miss
+    System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+    // Set the throttle value to a very long value
+    System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS,
+        String.valueOf(Integer.MAX_VALUE));
+
+    // Create a new FederatedZkClient, whose _routingDataUpdateInterval should be MAX_VALUE
+    FederatedZkClient federatedZkClient = new FederatedZkClient(
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+            .setRoutingDataSourceEndpoint(zkRealm).build(),
+        new RealmAwareZkClient.RealmAwareZkClientConfig());
+
+    // Add newShardingKey to ZK's routing data
+    zkRealmRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+        .add(newShardingKey);
+    zkClient
+        .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord);
+
+    try {
+      Assert.assertFalse(federatedZkClient.exists(newShardingKey));
+      Assert.fail("NoSuchElementException expected!");
+    } catch (NoSuchElementException e) {
+      // Expected because it should not read from the routing data source because of the throttle
+    }
+
+    // Clean up
+    zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
+    zkClient.close();
+    federatedZkClient.close();
+    System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+    System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
   }
 
   /*
@@ -504,7 +568,7 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
    * TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This
    *  could help avoid ZkClient leakage.
    */
-  @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+  @Test(dependsOnMethods = "testRoutingDataUpdateThrottle")
   public void testClose() {
     Assert.assertFalse(_realmAwareZkClient.isClosed());