You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:14:02 UTC
[helix] 07/12: Modify realm-aware ZkClient and Helix API for
configurable routing source
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 6014df6ddc89cb62163b8448cc408075b1691dc4
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Fri Jul 24 15:01:04 2020 -0700
Modify realm-aware ZkClient and Helix API for configurable routing source
This commit changes old MSDS-based interfaces and replaces them with a more generic configurable routing data source interfaces. This commit also adds test cases for Helix API.
---
.../main/java/org/apache/helix/ConfigAccessor.java | 7 +-
.../manager/zk/GenericBaseDataAccessorBuilder.java | 3 +-
.../helix/manager/zk/GenericZkHelixApiBuilder.java | 21 +++---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 14 ++--
.../java/org/apache/helix/manager/zk/ZKUtil.java | 3 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 7 +-
.../helix/manager/zk/ZkBucketDataAccessor.java | 2 +-
.../java/org/apache/helix/tools/ClusterSetup.java | 8 +-
.../multizk/TestMultiZkHelixJavaApis.java | 88 ++++++++++++++++++++--
.../apache/helix/rest/server/ServerContext.java | 5 +-
.../zookeeper/api/client/RealmAwareZkClient.java | 44 ++++++++---
.../zookeeper/constant/RoutingDataReaderType.java | 14 ++++
.../zookeeper/impl/client/DedicatedZkClient.java | 18 ++---
.../zookeeper/impl/client/FederatedZkClient.java | 17 ++---
.../zookeeper/impl/client/SharedZkClient.java | 17 ++---
15 files changed, 183 insertions(+), 85 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 48bcfd4..4885f31 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix;
* under the License.
*/
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -30,8 +29,8 @@ import java.util.TreeMap;
import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CloudConfig;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.HelixConfigScope;
@@ -40,8 +39,8 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.util.HelixUtil;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StringTemplate;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
@@ -120,7 +119,7 @@ public class ConfigAccessor {
new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
return;
- } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ } catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create ConfigAccessor!", e);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
index 054e693..a75c8cf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericBaseDataAccessorBuilder.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixException;
@@ -83,7 +82,7 @@ public class GenericBaseDataAccessorBuilder<B extends GenericBaseDataAccessorBui
case MULTI_REALM:
try {
zkClient = new FederatedZkClient(connectionConfig, clientConfig);
- } catch (IOException | InvalidRoutingDataException e) {
+ } catch (InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on multi-realm mode.", e);
}
break;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
index 840ec8f..d02f67e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
@@ -19,8 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.IOException;
-
import org.apache.helix.HelixException;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -109,7 +107,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
try {
_zkAddress = resolveZkAddressWithShardingKey(_realmAwareZkConnectionConfig);
isZkAddressSet = true;
- } catch (IOException | InvalidRoutingDataException e) {
+ } catch (InvalidRoutingDataException e) {
LOG.warn(
"GenericZkHelixApiBuilder: ZkAddress is not set and failed to resolve ZkAddress with ZK path sharding key!",
e);
@@ -166,7 +164,7 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
try {
return new FederatedZkClient(connectionConfig,
clientConfig.setZkSerializer(new ZNRecordSerializer()));
- } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ } catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("GenericZkHelixApiBuilder: Failed to create FederatedZkClient!",
e);
}
@@ -197,17 +195,18 @@ public abstract class GenericZkHelixApiBuilder<B extends GenericZkHelixApiBuilde
* ZK address is not given in this Builder.
* @param connectionConfig
* @return
- * @throws IOException
* @throws InvalidRoutingDataException
*/
private String resolveZkAddressWithShardingKey(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig)
- throws IOException, InvalidRoutingDataException {
- boolean isMsdsEndpointSet =
- connectionConfig.getMsdsEndpoint() != null && !connectionConfig.getMsdsEndpoint().isEmpty();
- // TODO: Make RoutingDataReaderType configurable
- MetadataStoreRoutingData routingData = isMsdsEndpointSet ? RoutingDataManager
- .getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, connectionConfig.getMsdsEndpoint())
+ throws InvalidRoutingDataException {
+ boolean isRoutingDataSourceEndpointSet =
+ connectionConfig.getRoutingDataSourceEndpoint() != null && !connectionConfig
+ .getRoutingDataSourceEndpoint().isEmpty();
+ MetadataStoreRoutingData routingData = isRoutingDataSourceEndpointSet ? RoutingDataManager
+ .getMetadataStoreRoutingData(
+ RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+ connectionConfig.getRoutingDataSourceEndpoint())
: RoutingDataManager.getMetadataStoreRoutingData();
return routingData.getMetadataStoreRealm(connectionConfig.getZkRealmShardingKey());
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 348f8d8..1759116 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -151,7 +151,7 @@ public class ZKHelixAdmin implements HelixAdmin {
try {
zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
- } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+ } catch (IllegalStateException | InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on multi-realm mode.", e);
}
} else {
@@ -965,13 +965,15 @@ public class ZKHelixAdmin implements HelixAdmin {
|| _zkClient instanceof FederatedZkClient) {
// If on multi-zk mode, we retrieve cluster information from Metadata Store Directory Service.
Map<String, List<String>> realmToShardingKeys;
- String msdsEndpoint = _zkClient.getRealmAwareZkConnectionConfig().getMsdsEndpoint();
- if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+ String routingDataSourceEndpoint =
+ _zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceEndpoint();
+ if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+ // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
realmToShardingKeys = RoutingDataManager.getRawRoutingData();
} else {
- // TODO: Make RoutingDataReaderType configurable
- realmToShardingKeys =
- RoutingDataManager.getRawRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+ realmToShardingKeys = RoutingDataManager.getRawRoutingData(RoutingDataReaderType
+ .lookUp(_zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()),
+ routingDataSourceEndpoint);
}
if (realmToShardingKeys == null || realmToShardingKeys.isEmpty()) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index cda5f39..1015834 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -622,7 +621,7 @@ public final class ZKUtil {
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig();
return new FederatedZkClient(connectionConfig, clientConfig);
- } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+ } catch (IllegalArgumentException | InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on realm-aware mode", e);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index d2096b0..4e6a7b0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -40,6 +39,7 @@ import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -52,9 +52,8 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
@@ -1342,7 +1341,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
try {
return new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
- } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+ } catch (IllegalStateException | InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on multi-realm mode.", e);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 8f0aa16..efdde81 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -94,7 +94,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig();
_zkClient = new FederatedZkClient(connectionConfig, clientConfig);
- } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+ } catch (IllegalArgumentException | InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on realm-aware mode", e);
}
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 59180eb..667cab7 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -40,12 +40,10 @@ import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.cloud.azure.AzureConstants;
-import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.PropertyKey;
import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.cloud.azure.AzureConstants;
+import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -162,7 +160,7 @@ public class ClusterSetup {
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
- } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ } catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create ConfigAccessor!", e);
}
} else {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 5146ce6..d72318a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -20,6 +20,7 @@ package org.apache.helix.integration.multizk;
*/
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -73,11 +74,14 @@ import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.api.client.ZkClientType;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.exception.MultiZkException;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -109,16 +113,17 @@ public class TestMultiZkHelixJavaApis {
private HelixAdmin _zkHelixAdmin;
// Save System property configs from before this test and pass onto after the test
- private Map<String, String> _configStore = new HashMap<>();
+ private final Map<String, String> _configStore = new HashMap<>();
+
+ private static final String ZK_PREFIX = "localhost:";
+ private static final int ZK_START_PORT = 8777;
+ private String _msdsEndpoint;
@BeforeClass
public void beforeClass() throws Exception {
// Create 3 in-memory zookeepers and routing mapping
- final String zkPrefix = "localhost:";
- final int zkStartPort = 8777;
-
for (int i = 0; i < NUM_ZK; i++) {
- String zkAddress = zkPrefix + (zkStartPort + i);
+ String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
@@ -132,6 +137,8 @@ public class TestMultiZkHelixJavaApis {
final String msdsHostName = "localhost";
final int msdsPort = 11117;
final String msdsNamespace = "multiZkTest";
+ _msdsEndpoint =
+ "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace;
_msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
_rawRoutingData);
_msds.startServer();
@@ -151,8 +158,7 @@ public class TestMultiZkHelixJavaApis {
// Turn on multiZk mode in System config
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
// MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
- System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
- "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
// Create a FederatedZkClient for admin work
_zkClient =
@@ -782,7 +788,8 @@ public class TestMultiZkHelixJavaApis {
new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData);
final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
- .setMsdsEndpoint(secondMsds.getEndpoint()).build();
+ .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+ .setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build();
secondMsds.startServer();
try {
@@ -996,4 +1003,69 @@ public class TestMultiZkHelixJavaApis {
}
return sb.toString();
}
+
+ /**
+ * Testing using ZK as the routing data source. We use BaseDataAccessor as the representative
+ * Helix API.
+ * Two modes are tested: ZK and HTTP-ZK fallback
+ */
+ @Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs")
+ public void testZkRoutingDataSourceConfigs() {
+ // Set up routing data in ZK by connecting directly to ZK
+ BaseDataAccessor<ZNRecord> accessor =
+ new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build();
+
+ // Create ZK realm routing data ZNRecord
+ _rawRoutingData.forEach((realm, keys) -> {
+ ZNRecord znRecord = new ZNRecord(realm);
+ znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+ new ArrayList<>(keys));
+ accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord,
+ AccessOption.PERSISTENT);
+ });
+
+ // Create connection configs with the source type set to each type
+ final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk =
+ connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
+ .setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build();
+ final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback =
+ connectionConfigBuilder
+ .setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
+ .setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build();
+ final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp =
+ connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
+ .setRoutingDataSourceEndpoint(_msdsEndpoint).build();
+
+ // Reset cached routing data
+ RoutingDataManager.reset();
+ // Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK
+ _msds.stopServer();
+
+ // Create a BaseDataAccessor instance with the connection config
+ BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+ .setRealmAwareZkConnectionConfig(connectionConfigZk).build();
+ BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor =
+ new ZkBaseDataAccessor.Builder<ZNRecord>()
+ .setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build();
+ try {
+ BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
+ .setRealmAwareZkConnectionConfig(connectionConfigHttp).build();
+ Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused.");
+ } catch (MultiZkException e) {
+ // Okay
+ }
+
+ // Check that all clusters appear as existing to this accessor
+ CLUSTER_LIST.forEach(cluster -> {
+ Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+ Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
+ });
+
+ // Close all connections
+ accessor.close();
+ zkBasedAccessor.close();
+ httpZkFallbackBasedAccessor.close();
+ }
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index f5d8915..176180f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -20,7 +20,6 @@ package org.apache.helix.rest.server;
* under the License.
*/
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -134,13 +133,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
// If MSDS endpoint is set for this namespace, use that instead.
if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
- connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+ connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint);
}
_zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
LOG.info("ServerContext: FederatedZkClient created successfully!");
- } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ } catch (InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create FederatedZkClient!", e);
}
} else {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index b1ecc38..e8c91f1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -56,8 +57,7 @@ public interface RealmAwareZkClient {
* MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException.
*/
enum RealmMode {
- SINGLE_REALM,
- MULTI_REALM
+ SINGLE_REALM, MULTI_REALM
}
int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
@@ -65,6 +65,7 @@ public interface RealmAwareZkClient {
int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
// listener subscription
+
/**
* Subscribe the path and the listener will handle child events of the path.
* Add exists watch to path if the path does not exist in ZooKeeper server.
@@ -86,7 +87,8 @@ public interface RealmAwareZkClient {
* @return ChildrentSubsribeResult. If the path does not exists, the isInstalled field
* is false. Otherwise, it is true and list of children are returned.
*/
- ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode);
+ ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
+ boolean skipWatchingNonExistNode);
void unsubscribeChildChanges(String path, IZkChildListener listener);
@@ -108,7 +110,8 @@ public interface RealmAwareZkClient {
* @param skipWatchingNonExistNode True means not installing any watch if path does not exist.
* return True if installation of watch succeed. Otherwise, return false.
*/
- boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode);
+ boolean subscribeDataChanges(String path, IZkDataListener listener,
+ boolean skipWatchingNonExistNode);
void unsubscribeDataChanges(String path, IZkDataListener listener);
@@ -378,12 +381,14 @@ public interface RealmAwareZkClient {
* NOTE: this field will be ignored if RealmMode is MULTI_REALM!
*/
private String _zkRealmShardingKey;
- private String _msdsEndpoint;
+ private String _routingDataSourceType;
+ private String _routingDataSourceEndpoint;
private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
private RealmAwareZkConnectionConfig(Builder builder) {
_zkRealmShardingKey = builder._zkRealmShardingKey;
- _msdsEndpoint = builder._msdsEndpoint;
+ _routingDataSourceType = builder._routingDataSourceType;
+ _routingDataSourceEndpoint = builder._routingDataSourceEndpoint;
_sessionTimeout = builder._sessionTimeout;
}
@@ -424,14 +429,19 @@ public interface RealmAwareZkClient {
return _sessionTimeout;
}
- public String getMsdsEndpoint() {
- return _msdsEndpoint;
+ public String getRoutingDataSourceType() {
+ return _routingDataSourceType;
+ }
+
+ public String getRoutingDataSourceEndpoint() {
+ return _routingDataSourceEndpoint;
}
public static class Builder {
private RealmMode _realmMode;
private String _zkRealmShardingKey;
- private String _msdsEndpoint;
+ private String _routingDataSourceType;
+ private String _routingDataSourceEndpoint;
private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
public Builder() {
@@ -453,8 +463,13 @@ public interface RealmAwareZkClient {
return this;
}
- public Builder setMsdsEndpoint(String msdsEndpoint) {
- _msdsEndpoint = msdsEndpoint;
+ public Builder setRoutingDataSourceType(String routingDataSourceType) {
+ _routingDataSourceType = routingDataSourceType;
+ return this;
+ }
+
+ public Builder setRoutingDataSourceEndpoint(String routingDataSourceEndpoint) {
+ _routingDataSourceEndpoint = routingDataSourceEndpoint;
return this;
}
@@ -482,6 +497,13 @@ public interface RealmAwareZkClient {
throw new IllegalArgumentException(
"RealmAwareZkConnectionConfig.Builder: ZK sharding key must be set on single-realm mode!");
}
+ if ((_routingDataSourceEndpoint == null && _routingDataSourceType != null) || (
+ _routingDataSourceEndpoint != null && _routingDataSourceType == null)) {
+ // For routing data source type and endpoint, if one is set and not the other, it is invalid
+ throw new IllegalArgumentException(
+ "RealmAwareZkConnectionConfig.Builder: routing data source type and endpoint are not configured properly! Type: "
+ + _routingDataSourceType + " Endpoint: " + _routingDataSourceEndpoint);
+ }
}
}
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
index aedef36..33b6c70 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataReaderType.java
@@ -19,6 +19,10 @@ package org.apache.helix.zookeeper.constant;
* under the License.
*/
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.helix.zookeeper.exception.MultiZkException;
+
+
/**
* RoutingDataReaderType is an enum that designates the reader type and the class name that can be
* used to create an instance of RoutingDataReader by reflection.
@@ -37,4 +41,14 @@ public enum RoutingDataReaderType {
public String getClassName() {
return this.className;
}
+
+ public static RoutingDataReaderType lookUp(String enumString) {
+ RoutingDataReaderType type =
+ EnumUtils.getEnumIgnoreCase(RoutingDataReaderType.class, enumString);
+ if (type == null) {
+ throw new MultiZkException(
+ "RoutingDataReaderType::lookUp: Unable to find the enum! String given: " + enumString);
+ }
+ return type;
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index 501670c..dbe64f3 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
* under the License.
*/
-import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
@@ -69,12 +68,10 @@ public class DedicatedZkClient implements RealmAwareZkClient {
* such as CRUD, change callback, and ephemeral operations for a single ZkRealmShardingKey.
* @param connectionConfig
* @param clientConfig
- * @throws IOException
* @throws InvalidRoutingDataException
*/
public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
- RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
- throws IOException, InvalidRoutingDataException {
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
if (connectionConfig == null) {
throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
}
@@ -84,14 +81,15 @@ public class DedicatedZkClient implements RealmAwareZkClient {
_connectionConfig = connectionConfig;
_clientConfig = clientConfig;
- // Get the routing data from a static Singleton HttpRoutingDataReader
- String msdsEndpoint = connectionConfig.getMsdsEndpoint();
- if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+ // Get MetadataStoreRoutingData
+ String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+ if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+ // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
_metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
} else {
- // TODO: Make RoutingDataReaderType configurable
- _metadataStoreRoutingData =
- RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+ _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+ RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+ routingDataSourceEndpoint);
}
_zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 22b9fe5..01e7fc7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
* under the License.
*/
-import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -87,8 +86,7 @@ public class FederatedZkClient implements RealmAwareZkClient {
// TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
- RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
- throws IOException, InvalidRoutingDataException {
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
if (connectionConfig == null) {
throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
}
@@ -96,14 +94,15 @@ public class FederatedZkClient implements RealmAwareZkClient {
throw new IllegalArgumentException("RealmAwareZkClientConfig cannot be null!");
}
- // Attempt to get MetadataStoreRoutingData
- String msdsEndpoint = connectionConfig.getMsdsEndpoint();
- if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+ // Get MetadataStoreRoutingData
+ String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+ if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+ // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
_metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
} else {
- // TODO: Make RoutingDataReaderType configurable
- _metadataStoreRoutingData =
- RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+ _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+ RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+ routingDataSourceEndpoint);
}
_isClosed = false;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 341731e..7c3a562 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -19,7 +19,6 @@ package org.apache.helix.zookeeper.impl.client;
* under the License.
*/
-import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
@@ -65,8 +64,7 @@ public class SharedZkClient implements RealmAwareZkClient {
private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
- RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
- throws IOException, InvalidRoutingDataException {
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) throws InvalidRoutingDataException {
if (connectionConfig == null) {
throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
}
@@ -76,14 +74,15 @@ public class SharedZkClient implements RealmAwareZkClient {
_connectionConfig = connectionConfig;
_clientConfig = clientConfig;
- // Get the routing data from a static Singleton HttpRoutingDataReader
- String msdsEndpoint = connectionConfig.getMsdsEndpoint();
- if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
+ // Get MetadataStoreRoutingData
+ String routingDataSourceEndpoint = connectionConfig.getRoutingDataSourceEndpoint();
+ if (routingDataSourceEndpoint == null || routingDataSourceEndpoint.isEmpty()) {
+ // If endpoint is not given explicitly, use HTTP and the endpoint set in System Properties
_metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData();
} else {
- // TODO: Make RoutingDataReaderType configurable
- _metadataStoreRoutingData =
- RoutingDataManager.getMetadataStoreRoutingData(RoutingDataReaderType.HTTP, msdsEndpoint);
+ _metadataStoreRoutingData = RoutingDataManager.getMetadataStoreRoutingData(
+ RoutingDataReaderType.lookUp(connectionConfig.getRoutingDataSourceType()),
+ routingDataSourceEndpoint);
}
_zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();