You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:14:06 UTC
[helix] 11/12: Implement throttling for routing data update on
cache miss
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8c2059c78130f8138d6295bef27d76ac5a413aaf
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Jul 27 18:36:39 2020 -0700
Implement throttling for routing data update on cache miss
This commit implements throttling for routing data update by using a timestamp for last time the cache was reset in RoutingDataManager. It defines a default interval (5 seconds) but makes this interval configurable by way of System Properties config.
---
...PropertyKeys.java => RoutingDataConstants.java} | 14 ++---
.../constant/RoutingSystemPropertyKeys.java | 5 ++
.../zookeeper/impl/client/FederatedZkClient.java | 38 ++++++++++++
.../zookeeper/routing/RoutingDataManager.java | 12 ++++
.../impl/client/TestFederatedZkClient.java | 70 +++++++++++++++++++++-
5 files changed, 128 insertions(+), 11 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
similarity index 66%
copy from zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
copy to zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
index a57075b..164c543 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java
@@ -19,15 +19,13 @@ package org.apache.helix.zookeeper.constant;
* under the License.
*/
-/**
- * This class contains various routing-related system property keys for multi-zk clients.
- */
-public class RoutingSystemPropertyKeys {
+public class RoutingDataConstants {
/**
- * If enabled, FederatedZkClient (multiZkClient) will invalidate the cached routing data and
- * re-read the routing data from the routing data source upon ZK path sharding key cache miss.
+ * Default interval that defines how frequently RoutingDataManager's routing data should be
+ * updated from the routing data source. This exists to apply throttling to the rate at which
+ * the ZkClient pulls routing data from the routing data source to avoid overloading the routing
+ * data source.
*/
- public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
- "update.routing.data.on.cache.miss.enabled";
+ public static final long DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS = 5 * 1000L; // 5 seconds
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
index a57075b..e22ad08 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java
@@ -30,4 +30,9 @@ public class RoutingSystemPropertyKeys {
*/
public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS =
"update.routing.data.on.cache.miss.enabled";
+
+ /**
+ * The interval to use between routing data updates from the routing data source.
+ */
+ public static final String ROUTING_DATA_UPDATE_INTERVAL_MS = "routing.data.update.interval.ms";
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 4354537..dc55d53 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -30,6 +30,7 @@ import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.RoutingDataConstants;
import org.apache.helix.zookeeper.constant.RoutingSystemPropertyKeys;
import org.apache.helix.zookeeper.exception.MultiZkException;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -86,6 +87,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
private PathBasedZkSerializer _pathBasedZkSerializer;
private final boolean _routingDataUpdateOnCacheMissEnabled = Boolean.parseBoolean(
System.getProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS));
+ private long _routingDataUpdateInterval;
// TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
@@ -102,6 +104,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
_clientConfig = clientConfig;
_pathBasedZkSerializer = clientConfig.getZkSerializer();
_zkRealmToZkClientMap = new ConcurrentHashMap<>();
+ getRoutingDataUpdateInterval();
}
@Override
@@ -587,6 +590,11 @@ public class FederatedZkClient implements RealmAwareZkClient {
try {
zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
} catch (NoSuchElementException e4) {
+ if (shouldThrottleRead()) {
+ // If routing data update from routing data source has taken place recently,
+ // then just skip the update and throw the exception
+ throw e4;
+ }
// Try 2) Reset RoutingDataManager and re-read the routing data from routing data
// source via I/O. Since RoutingDataManager's cache doesn't have it either, so we
// synchronize on all threads by locking on FederatedZkClient.class.
@@ -626,4 +634,34 @@ public class FederatedZkClient implements RealmAwareZkClient {
+ ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
+ " to create a dedicated RealmAwareZkClient for this operation.");
}
+
+ /**
+ * Resolves the routing data update interval value from System Properties.
+ */
+ private void getRoutingDataUpdateInterval() {
+ try {
+ _routingDataUpdateInterval = Long.parseLong(
+ System.getProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS));
+ if (_routingDataUpdateInterval < 0) {
+ LOG.warn("FederatedZkClient::shouldThrottleRead(): invalid value: {} given for "
+ + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!",
+ _routingDataUpdateInterval);
+ _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("FederatedZkClient::shouldThrottleRead(): failed to parse "
+ + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!", e);
+ _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS;
+ }
+ }
+
+ /**
+ * Return whether the read request to routing data source should be throttled using the default
+ * routing data update interval.
+ * @return
+ */
+ private boolean shouldThrottleRead() {
+ return System.currentTimeMillis() - RoutingDataManager.getInstance().getLastResetTimestamp()
+ < _routingDataUpdateInterval;
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
index 6df9616..853bd5c 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java
@@ -51,6 +51,9 @@ public class RoutingDataManager {
private final Map<String, MetadataStoreRoutingData> _metadataStoreRoutingDataMap =
new ConcurrentHashMap<>();
+ // Tracks the time at which reset() was called last. Used to throttle reset()
+ private volatile long _lastResetTimestamp;
+
// Singleton instance
private static RoutingDataManager _instance;
@@ -164,6 +167,15 @@ public class RoutingDataManager {
_metadataStoreRoutingDataMap.clear();
_defaultMsdsEndpoint =
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+ _lastResetTimestamp = System.currentTimeMillis();
+ }
+
+ /**
+ * Returns the timestamp for the last reset().
+ * @return
+ */
+ public long getLastResetTimestamp() {
+ return _lastResetTimestamp;
}
/**
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index 93e5892..e201905 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -279,6 +279,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
throws IOException, InvalidRoutingDataException {
// Enable routing data update upon cache miss
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+ // Set the routing data update interval to 0 so there's no delay in testing
+ System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0");
RoutingDataManager.getInstance().getMetadataStoreRoutingData();
_msdsServer.stopServer();
@@ -375,7 +377,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
// Shut down MSDS
_msdsServer.stopServer();
// Disable System property
- System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false");
+ System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+ System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
}
/**
@@ -402,6 +405,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
// Enable routing data update upon cache miss
System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+ // Set the routing data update interval to 0 so there's no delay in testing
+ System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0");
RoutingDataManager.getInstance().reset();
RoutingDataManager.getInstance().getMetadataStoreRoutingData(RoutingDataReaderType.ZK, zkRealm);
@@ -496,7 +501,66 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
zkClient.close();
// Disable System property
- System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false");
+ System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+ System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
+ }
+
+ /**
+ * Test that throttle based on last reset timestamp works correctly. Here, we use ZK as the
+ * routing data source.
+ * Test scenario: set the throttle value to a high value and check that routing data update from
+ * the routing data source does NOT happen (because it would be throttled).
+ */
+ @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+ public void testRoutingDataUpdateThrottle() throws InvalidRoutingDataException {
+ // Call reset to set the last reset() timestamp in RoutingDataManager
+ RoutingDataManager.getInstance().reset();
+
+ // Set up routing data in ZK with empty sharding key list
+ String zkRealm = "localhost:2127";
+ String newShardingKey = "/throttle";
+ ZkClient zkClient =
+ new ZkClient.Builder().setZkServer(zkRealm).setZkSerializer(new ZNRecordSerializer())
+ .build();
+ zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, CreateMode.PERSISTENT);
+ ZNRecord zkRealmRecord = new ZNRecord(zkRealm);
+ zkRealmRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+ new ArrayList<>(TestConstants.TEST_KEY_LIST_1));
+ zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord,
+ CreateMode.PERSISTENT);
+
+ // Enable routing data update upon cache miss
+ System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true");
+ // Set the throttle value to a very long value
+ System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS,
+ String.valueOf(Integer.MAX_VALUE));
+
+ // Create a new FederatedZkClient, whose _routingDataUpdateInterval should be MAX_VALUE
+ FederatedZkClient federatedZkClient = new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+ .setRoutingDataSourceEndpoint(zkRealm).build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig());
+
+ // Add newShardingKey to ZK's routing data
+ zkRealmRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .add(newShardingKey);
+ zkClient
+ .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord);
+
+ try {
+ Assert.assertFalse(federatedZkClient.exists(newShardingKey));
+ Assert.fail("NoSuchElementException expected!");
+ } catch (NoSuchElementException e) {
+ // Expected because it should not read from the routing data source because of the throttle
+ }
+
+ // Clean up
+ zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
+ zkClient.close();
+ federatedZkClient.close();
+ System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS);
+ System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS);
}
/*
@@ -504,7 +568,7 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
* TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This
* could help avoid ZkClient leakage.
*/
- @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK")
+ @Test(dependsOnMethods = "testRoutingDataUpdateThrottle")
public void testClose() {
Assert.assertFalse(_realmAwareZkClient.isClosed());