You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:18 UTC
[helix] 48/50: Make multiZkEnabled configurable in
HelixRestNamespace (#915)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 45cc0fc67973791144e8f61d962bc803d5ce48c2
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Mar 26 17:54:58 2020 -0700
Make multiZkEnabled configurable in HelixRestNamespace (#915)
It was observed that we need more fine-grained control over this multiZkEnabled config because there could exists namespaces with differing modes. Because multiple namespaces may be co-deployed, we cannot simply make it a system config.
---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 ++-
.../helix/rest/common/HelixRestNamespace.java | 18 ++++++++-
.../apache/helix/rest/server/HelixRestMain.java | 4 +-
.../apache/helix/rest/server/HelixRestServer.java | 3 +-
.../apache/helix/rest/server/ServerContext.java | 43 ++++++++++++++--------
5 files changed, 52 insertions(+), 22 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 4f5b67e..5ded96d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -936,8 +936,10 @@ public class ZKHelixAdmin implements HelixAdmin {
public List<String> getClusters() {
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)
|| _zkClient instanceof FederatedZkClient) {
- throw new UnsupportedOperationException(
- "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!");
+ String errMsg =
+ "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!";
+ LOG.error(errMsg);
+ throw new UnsupportedOperationException(errMsg);
}
List<String> zkToplevelPathes = _zkClient.getChildren("/");
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
index 0632f36..29b1561 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
@@ -35,6 +35,7 @@ public class HelixRestNamespace {
METADATA_STORE_TYPE,
METADATA_STORE_ADDRESS,
IS_DEFAULT,
+ MULTI_ZK_ENABLED,
MSDS_ENDPOINT
}
@@ -67,6 +68,11 @@ public class HelixRestNamespace {
private boolean _isDefault;
/**
+ * Flag indicating whether this namespace should have multi-zk feature enabled.
+ */
+ private boolean _isMultiZkEnabled;
+
+ /**
* Endpoint for accessing MSDS for this namespace.
*/
private String _msdsEndpoint;
@@ -77,15 +83,17 @@ public class HelixRestNamespace {
public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException {
- this(name, metadataStoreType, metadataStoreAddress, isDefault, null);
+ this(name, metadataStoreType, metadataStoreAddress, isDefault, false, null);
}
public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
- String metadataStoreAddress, boolean isDefault, String msdsEndpoint) {
+ String metadataStoreAddress, boolean isDefault, boolean isMultiZkEnabled,
+ String msdsEndpoint) {
_name = name;
_metadataStoreAddress = metadataStoreAddress;
_metadataStoreType = metadataStoreType;
_isDefault = isDefault;
+ _isMultiZkEnabled = isMultiZkEnabled;
_msdsEndpoint = msdsEndpoint;
validate();
}
@@ -119,9 +127,15 @@ public class HelixRestNamespace {
Map<String, String> ret = new HashMap<>();
ret.put(HelixRestNamespaceProperty.NAME.name(), _name);
ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault));
+ ret.put(HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name(), String.valueOf(_isMultiZkEnabled));
+ ret.put(HelixRestNamespaceProperty.MSDS_ENDPOINT.name(), String.valueOf(_msdsEndpoint));
return ret;
}
+ public boolean isMultiZkEnabled() {
+ return _isMultiZkEnabled;
+ }
+
public String getMsdsEndpoint() {
return _msdsEndpoint;
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
index 49940c3..ffc8aed 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
@@ -151,7 +151,9 @@ public class HelixRestMain {
HelixRestNamespace.HelixMetadataStoreType.valueOf(
config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())),
config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()),
- false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
+ false, Boolean.parseBoolean(
+ config.get(HelixRestNamespace.HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name())),
+ config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index e6b5b34..cb199cb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -149,7 +149,8 @@ public class HelixRestServer {
// Enable the default statistical monitoring MBean for Jersey server
cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
- new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint()));
+ new ServerContext(namespace.getMetadataStoreAddress(), namespace.isMultiZkEnabled(),
+ namespace.getMsdsEndpoint()));
if (type == ServletType.DEFAULT_SERVLET) {
cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
} else {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 52f1738..c6632c2 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -61,6 +61,7 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class);
private final String _zkAddr;
+ private boolean _isMultiZkEnabled;
private final String _msdsEndpoint;
private volatile RealmAwareZkClient _zkClient;
@@ -83,16 +84,18 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
private RealmAwareZkClient _zkClientForListener;
public ServerContext(String zkAddr) {
- this(zkAddr, null);
+ this(zkAddr, false, null);
}
/**
* Initializes a ServerContext for this namespace.
* @param zkAddr routing ZK address (on multi-zk mode)
+ * @param isMultiZkEnabled boolean flag for whether multi-zk mode is enabled
* @param msdsEndpoint if given, this server context will try to read routing data from this MSDS.
*/
- public ServerContext(String zkAddr, String msdsEndpoint) {
+ public ServerContext(String zkAddr, boolean isMultiZkEnabled, String msdsEndpoint) {
_zkAddr = zkAddr;
+ _isMultiZkEnabled = isMultiZkEnabled;
_msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode
// We should NOT initiate _zkClient and anything that depends on _zkClient in
@@ -111,20 +114,9 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
synchronized (this) {
if (_zkClient == null) {
// If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
- if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
- LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!",
- _zkAddr);
+ if (_isMultiZkEnabled || Boolean
+ .parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
try {
- RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
- new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
- // If MSDS endpoint is set for this namespace, use that instead.
- if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
- connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
- }
- _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
- new RealmAwareZkClient.RealmAwareZkClientConfig()
- .setZkSerializer(new ZNRecordSerializer()));
-
// Make sure the ServerContext is subscribed to routing data change so that it knows
// when to reset ZkClient and Helix APIs
if (_zkClientForListener == null) {
@@ -136,7 +128,20 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
// Refresh data subscription
_zkClientForListener.unsubscribeAll();
_zkClientForListener.subscribeRoutingDataChanges(this, this);
+ LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!", _zkAddr);
+
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ // If MSDS endpoint is set for this namespace, use that instead.
+ if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+ connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+ }
+ _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+ LOG.info("ServerContext: FederatedZkClient created successfully!");
} catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ LOG.error("Failed to create FederatedZkClient!", e);
throw new HelixException("Failed to create FederatedZkClient!", e);
}
} else {
@@ -279,7 +284,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
@Override
public void handleDataDeleted(String dataPath) {
- // NOP because this is covered by handleChildChange()
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ // Resubscribe
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ resetZkResources();
}
@Override