You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:18 UTC

[helix] 48/50: Make multiZkEnabled configurable in HelixRestNamespace (#915)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 45cc0fc67973791144e8f61d962bc803d5ce48c2
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Mar 26 17:54:58 2020 -0700

    Make multiZkEnabled configurable in HelixRestNamespace (#915)
    
    It was observed that we need more fine-grained control over this multiZkEnabled config because there could exists namespaces with differing modes. Because multiple namespaces may be co-deployed, we cannot simply make it a system config.
---
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  6 ++-
 .../helix/rest/common/HelixRestNamespace.java      | 18 ++++++++-
 .../apache/helix/rest/server/HelixRestMain.java    |  4 +-
 .../apache/helix/rest/server/HelixRestServer.java  |  3 +-
 .../apache/helix/rest/server/ServerContext.java    | 43 ++++++++++++++--------
 5 files changed, 52 insertions(+), 22 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 4f5b67e..5ded96d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -936,8 +936,10 @@ public class ZKHelixAdmin implements HelixAdmin {
   public List<String> getClusters() {
     if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)
         || _zkClient instanceof FederatedZkClient) {
-      throw new UnsupportedOperationException(
-          "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!");
+      String errMsg =
+          "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!";
+      LOG.error(errMsg);
+      throw new UnsupportedOperationException(errMsg);
     }
 
     List<String> zkToplevelPathes = _zkClient.getChildren("/");
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
index 0632f36..29b1561 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
@@ -35,6 +35,7 @@ public class HelixRestNamespace {
     METADATA_STORE_TYPE,
     METADATA_STORE_ADDRESS,
     IS_DEFAULT,
+    MULTI_ZK_ENABLED,
     MSDS_ENDPOINT
   }
 
@@ -67,6 +68,11 @@ public class HelixRestNamespace {
   private boolean _isDefault;
 
   /**
+   * Flag indicating whether this namespace should have multi-zk feature enabled.
+   */
+  private boolean _isMultiZkEnabled;
+
+  /**
    * Endpoint for accessing MSDS for this namespace.
    */
   private String _msdsEndpoint;
@@ -77,15 +83,17 @@ public class HelixRestNamespace {
 
   public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
       String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException {
-    this(name, metadataStoreType, metadataStoreAddress, isDefault, null);
+    this(name, metadataStoreType, metadataStoreAddress, isDefault, false, null);
   }
 
   public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
-      String metadataStoreAddress, boolean isDefault, String msdsEndpoint) {
+      String metadataStoreAddress, boolean isDefault, boolean isMultiZkEnabled,
+      String msdsEndpoint) {
     _name = name;
     _metadataStoreAddress = metadataStoreAddress;
     _metadataStoreType = metadataStoreType;
     _isDefault = isDefault;
+    _isMultiZkEnabled = isMultiZkEnabled;
     _msdsEndpoint = msdsEndpoint;
     validate();
   }
@@ -119,9 +127,15 @@ public class HelixRestNamespace {
     Map<String, String> ret = new HashMap<>();
     ret.put(HelixRestNamespaceProperty.NAME.name(), _name);
     ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault));
+    ret.put(HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name(), String.valueOf(_isMultiZkEnabled));
+    ret.put(HelixRestNamespaceProperty.MSDS_ENDPOINT.name(), String.valueOf(_msdsEndpoint));
     return ret;
   }
 
+  public boolean isMultiZkEnabled() {
+    return _isMultiZkEnabled;
+  }
+
   public String getMsdsEndpoint() {
     return _msdsEndpoint;
   }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
index 49940c3..ffc8aed 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
@@ -151,7 +151,9 @@ public class HelixRestMain {
           HelixRestNamespace.HelixMetadataStoreType.valueOf(
               config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())),
           config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()),
-          false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
+          false, Boolean.parseBoolean(
+          config.get(HelixRestNamespace.HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name())),
+          config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
     }
   }
 
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index e6b5b34..cb199cb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -149,7 +149,8 @@ public class HelixRestServer {
     // Enable the default statistical monitoring MBean for Jersey server
     cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
     cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
-        new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint()));
+        new ServerContext(namespace.getMetadataStoreAddress(), namespace.isMultiZkEnabled(),
+            namespace.getMsdsEndpoint()));
     if (type == ServletType.DEFAULT_SERVLET) {
       cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
     } else {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 52f1738..c6632c2 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -61,6 +61,7 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
   private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class);
 
   private final String _zkAddr;
+  private boolean _isMultiZkEnabled;
   private final String _msdsEndpoint;
   private volatile RealmAwareZkClient _zkClient;
 
@@ -83,16 +84,18 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
   private RealmAwareZkClient _zkClientForListener;
 
   public ServerContext(String zkAddr) {
-    this(zkAddr, null);
+    this(zkAddr, false, null);
   }
 
   /**
    * Initializes a ServerContext for this namespace.
    * @param zkAddr routing ZK address (on multi-zk mode)
+   * @param isMultiZkEnabled boolean flag for whether multi-zk mode is enabled
    * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS.
    */
-  public ServerContext(String zkAddr, String msdsEndpoint) {
+  public ServerContext(String zkAddr, boolean isMultiZkEnabled, String msdsEndpoint) {
     _zkAddr = zkAddr;
+    _isMultiZkEnabled = isMultiZkEnabled;
     _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode
 
     // We should NOT initiate _zkClient and anything that depends on _zkClient in
@@ -111,20 +114,9 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
       synchronized (this) {
         if (_zkClient == null) {
           // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
-          if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
-            LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!",
-                _zkAddr);
+          if (_isMultiZkEnabled || Boolean
+              .parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
             try {
-              RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
-                  new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
-              // If MSDS endpoint is set for this namespace, use that instead.
-              if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
-                connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
-              }
-              _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
-                  new RealmAwareZkClient.RealmAwareZkClientConfig()
-                      .setZkSerializer(new ZNRecordSerializer()));
-
               // Make sure the ServerContext is subscribed to routing data change so that it knows
               // when to reset ZkClient and Helix APIs
               if (_zkClientForListener == null) {
@@ -136,7 +128,20 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
               // Refresh data subscription
               _zkClientForListener.unsubscribeAll();
               _zkClientForListener.subscribeRoutingDataChanges(this, this);
+              LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!", _zkAddr);
+
+              RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+                  new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+              // If MSDS endpoint is set for this namespace, use that instead.
+              if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+                connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+              }
+              _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
+                  new RealmAwareZkClient.RealmAwareZkClientConfig()
+                      .setZkSerializer(new ZNRecordSerializer()));
+              LOG.info("ServerContext: FederatedZkClient created successfully!");
             } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+              LOG.error("Failed to create FederatedZkClient!", e);
               throw new HelixException("Failed to create FederatedZkClient!", e);
             }
           } else {
@@ -279,7 +284,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
 
   @Override
   public void handleDataDeleted(String dataPath) {
-    // NOP because this is covered by handleChildChange()
+    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+      return;
+    }
+    // Resubscribe
+    _zkClientForListener.unsubscribeAll();
+    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    resetZkResources();
   }
 
   @Override