You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:17 UTC
[helix] 47/50: Make Helix REST realm-aware (#908)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git
commit cf047b52abef4c089dbc7f220ace44db1d5e17db
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 25 22:20:10 2020 -0700
Make Helix REST realm-aware (#908)
Helix REST needs to start using a realm-aware ZkClient on multi-zk mode. Also it needs to become a listener on routing data because we don't want to restart the HelixRestServer every time we update the routing data.
Changelist:
Make ServerContext listen on routing data paths if run on multi-zk mode
Make HelixRestServer use RealmAwareZkClient (FederatedZkClient) on multi-zk mode
---
.../java/org/apache/helix/task/TaskDriver.java | 6 +-
.../helix/rest/common/HelixRestNamespace.java | 24 +-
.../metadatastore/ZkMetadataStoreDirectory.java | 55 +++-
.../accessor/ZkRoutingDataReader.java | 37 +--
.../accessor/ZkRoutingDataWriter.java | 17 +-
.../apache/helix/rest/server/HelixRestMain.java | 6 +-
.../apache/helix/rest/server/HelixRestServer.java | 2 +-
.../apache/helix/rest/server/ServerContext.java | 289 +++++++++++++++++----
.../resources/helix/AbstractHelixResource.java | 14 +-
.../server/resources/helix/ClusterAccessor.java | 16 +-
.../server/resources/helix/ResourceAccessor.java | 10 +-
.../integration/TestRoutingDataUpdate.java | 176 +++++++++++++
.../helix/rest/server/AbstractTestClass.java | 40 +--
.../constant/MetadataStoreRoutingConstants.java | 2 +
.../helix/msdcommon/util/ZkValidationUtil.java | 3 +-
pom.xml | 34 ---
.../zookeeper/api/client/RealmAwareZkClient.java | 16 ++
.../zookeeper/util/HttpRoutingDataReader.java | 14 +-
.../impl/client/RealmAwareZkClientTestBase.java | 2 +-
19 files changed, 573 insertions(+), 190 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 987cc44..506a06b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -47,7 +47,7 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
@@ -98,12 +98,12 @@ public class TaskDriver {
}
@Deprecated
- public TaskDriver(HelixZkClient client, String clusterName) {
+ public TaskDriver(RealmAwareZkClient client, String clusterName) {
this(client, new ZkBaseDataAccessor<>(client), clusterName);
}
@Deprecated
- public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
+ public TaskDriver(RealmAwareZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor,
String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
new ZkHelixPropertyStore<>(baseAccessor, PropertyPathBuilder.propertyStore(clusterName),
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
index a2fb52c..0632f36 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
@@ -34,7 +34,8 @@ public class HelixRestNamespace {
NAME,
METADATA_STORE_TYPE,
METADATA_STORE_ADDRESS,
- IS_DEFAULT
+ IS_DEFAULT,
+ MSDS_ENDPOINT
}
/**
@@ -55,7 +56,7 @@ public class HelixRestNamespace {
private HelixMetadataStoreType _metadataStoreType;
/**
- * Address of metadata store. Should be informat of
+ * Address of metadata store. Should be in the format of
* "[ip-address]:[port]" or "[dns-name]:[port]"
*/
private String _metadataStoreAddress;
@@ -65,16 +66,27 @@ public class HelixRestNamespace {
*/
private boolean _isDefault;
+ /**
+ * Endpoint for accessing MSDS for this namespace.
+ */
+ private String _msdsEndpoint;
+
public HelixRestNamespace(String metadataStoreAddress) throws IllegalArgumentException {
this(DEFAULT_NAMESPACE_NAME, HelixMetadataStoreType.ZOOKEEPER, metadataStoreAddress, true);
}
- public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, String metadataStoreAddress, boolean isDefault)
- throws IllegalArgumentException {
+ public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
+ String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException {
+ this(name, metadataStoreType, metadataStoreAddress, isDefault, null);
+ }
+
+ public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType,
+ String metadataStoreAddress, boolean isDefault, String msdsEndpoint) {
_name = name;
_metadataStoreAddress = metadataStoreAddress;
_metadataStoreType = metadataStoreType;
_isDefault = isDefault;
+ _msdsEndpoint = msdsEndpoint;
validate();
}
@@ -109,4 +121,8 @@ public class HelixRestNamespace {
ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault));
return ret;
}
+
+ public String getMsdsEndpoint() {
+ return _msdsEndpoint;
+ }
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
index c83245f..42b2b17 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -39,6 +39,7 @@ import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWrit
import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
@@ -98,20 +99,27 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
if (!_routingZkAddressMap.containsKey(namespace)) {
synchronized (_routingZkAddressMap) {
if (!_routingZkAddressMap.containsKey(namespace)) {
- // Ensure that ROUTING_DATA_PATH exists in ZK.
- HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+ HelixZkClient zkClient = null;
try {
- zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
- } catch (ZkNodeExistsException e) {
- // The node already exists and it's okay
+ // Ensure that ROUTING_DATA_PATH exists in ZK.
+ zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+ createRoutingDataPath(zkClient, zkAddress);
+ } finally {
+ if (zkClient != null && !zkClient.isClosed()) {
+ zkClient.close();
+ }
+ }
+ try {
+ _routingZkAddressMap.put(namespace, zkAddress);
+ _routingDataReaderMap
+ .put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
+ _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress));
+ } catch (IllegalArgumentException | IllegalStateException e) {
+ LOG.error("ZkMetadataStoreDirectory: initializing ZkRoutingDataReader/Writer failed!",
+ e);
}
-
- _routingZkAddressMap.put(namespace, zkAddress);
- _routingDataReaderMap.put(namespace, new ZkRoutingDataReader(namespace, zkAddress, this));
- _routingDataWriterMap.put(namespace, new ZkRoutingDataWriter(namespace, zkAddress));
-
// Populate realmToShardingKeys with ZkRoutingDataReader
Map<String, List<String>> rawRoutingData =
_routingDataReaderMap.get(namespace).getRoutingData();
@@ -119,7 +127,8 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
try {
_routingDataMap.put(namespace, new TrieRoutingData(rawRoutingData));
} catch (InvalidRoutingDataException e) {
- LOG.warn("TrieRoutingData is not created for namespace {}", namespace, e);
+ LOG.warn("ZkMetadataStoreDirectory: TrieRoutingData is not created for namespace {}",
+ namespace, e);
}
}
}
@@ -145,7 +154,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
throw new NoSuchElementException("Namespace " + namespace + " does not exist!");
}
Set<String> allShardingKeys = new HashSet<>();
- _realmToShardingKeysMap.get(namespace).values().forEach(keys -> allShardingKeys.addAll(keys));
+ _realmToShardingKeysMap.get(namespace).values().forEach(allShardingKeys::addAll);
return allShardingKeys;
}
@@ -339,4 +348,22 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
_routingDataMap.clear();
_zkMetadataStoreDirectoryInstance = null;
}
+
+ /**
+ * Make sure the root routing data path exists. Also, register the routing ZK address.
+ * @param zkClient
+ */
+ public static void createRoutingDataPath(HelixZkClient zkClient, String zkAddress) {
+ try {
+ zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
+ } catch (ZkNodeExistsException e) {
+ // The node already exists and it's okay
+ }
+ // Make sure ROUTING_DATA_PATH is mapped to the routing ZK so that FederatedZkClient used
+ // in Helix REST can subscribe to the routing data path
+ ZNRecord znRecord = new ZNRecord(MetadataStoreRoutingConstants.ROUTING_DATA_PATH.substring(1));
+ znRecord.setListField(MetadataStoreRoutingConstants.ROUTING_ZK_ADDRESS_KEY,
+ Collections.singletonList(zkAddress));
+ zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, znRecord);
+ }
}
\ No newline at end of file
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
index 6c75618..cfe6eb5 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
@@ -27,7 +27,9 @@ import java.util.Map;
import org.apache.helix.msdcommon.callback.RoutingDataListener;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
@@ -35,7 +37,6 @@ import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.Watcher;
@@ -59,24 +60,11 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
- // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
- // create() semantic will fail if it already exists
- try {
- _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
- } catch (ZkNodeExistsException e) {
- // This is okay
- }
+ ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, _zkAddress);
_routingDataListener = routingDataListener;
if (_routingDataListener != null) {
- // Subscribe child changes
- _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
- // Subscribe data changes
- for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
- _zkClient
- .subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
- this);
- }
+ _zkClient.subscribeRoutingDataChanges(this, this);
}
}
@@ -118,7 +106,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
@Override
public synchronized void handleDataChange(String s, Object o) {
- if (_zkClient.isClosed()) {
+ if (_zkClient == null || _zkClient.isClosed()) {
return;
}
_routingDataListener.refreshRoutingData(_namespace);
@@ -138,7 +126,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
@Override
public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
- if (_zkClient.isClosed()) {
+ if (_zkClient == null || _zkClient.isClosed()) {
return;
}
_routingDataListener.refreshRoutingData(_namespace);
@@ -146,7 +134,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
@Override
public synchronized void handleNewSession(String sessionId) {
- if (_zkClient.isClosed()) {
+ if (_zkClient == null || _zkClient.isClosed()) {
return;
}
_routingDataListener.refreshRoutingData(_namespace);
@@ -154,24 +142,19 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
@Override
public synchronized void handleSessionEstablishmentError(Throwable error) {
- if (_zkClient.isClosed()) {
+ if (_zkClient == null || _zkClient.isClosed()) {
return;
}
_routingDataListener.refreshRoutingData(_namespace);
}
private void handleResubscription() {
- if (_zkClient.isClosed()) {
+ if (_zkClient == null || _zkClient.isClosed()) {
return;
}
-
// Renew subscription
_zkClient.unsubscribeAll();
- _zkClient.subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, this);
- for (String child : _zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
- _zkClient.subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
- this);
- }
+ _zkClient.subscribeRoutingDataChanges(this, this);
_routingDataListener.refreshRoutingData(_namespace);
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
index 32b7681..791d9bb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
@@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -82,20 +83,16 @@ public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
- // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
- // create() semantic will fail if it already exists
- try {
- _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
- } catch (ZkNodeExistsException e) {
- // This is okay
- }
+ ZkMetadataStoreDirectory.createRoutingDataPath(_zkClient, zkAddress);
// Get the hostname (REST endpoint) from System property
String hostName = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
if (hostName == null || hostName.isEmpty()) {
- throw new IllegalStateException(
- "Hostname is not set or is empty. System.getProperty fails to fetch "
- + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY + ".");
+ String errMsg =
+ "ZkRoutingDataWriter: Hostname is not set or is empty. System.getProperty fails to fetch "
+ + MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY;
+ LOG.error(errMsg);
+ throw new IllegalStateException(errMsg);
}
_myHostName = HttpConstants.HTTP_PROTOCOL_PREFIX + hostName;
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
index b28f227..49940c3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java
@@ -146,10 +146,12 @@ public class HelixRestMain {
// Currently we don't support adding default namespace through yaml manifest so all
// namespaces created here will not be default
// TODO: support specifying default namespace from config file
- namespaces.add(new HelixRestNamespace(config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()),
+ namespaces.add(new HelixRestNamespace(
+ config.get(HelixRestNamespace.HelixRestNamespaceProperty.NAME.name()),
HelixRestNamespace.HelixMetadataStoreType.valueOf(
config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())),
- config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()), false));
+ config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()),
+ false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name())));
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index 64c1139..e6b5b34 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -149,7 +149,7 @@ public class HelixRestServer {
// Enable the default statistical monitoring MBean for Jersey server
cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true);
cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
- new ServerContext(namespace.getMetadataStoreAddress()));
+ new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint()));
if (type == ServletType.DEFAULT_SERVLET) {
cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
} else {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index b845356..52f1738 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -20,34 +20,53 @@ package org.apache.helix.rest.server;
* under the License.
*/
-import java.util.HashMap;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
-import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerContext implements IZkDataListener, IZkChildListener, IZkStateListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class);
-public class ServerContext {
private final String _zkAddr;
- private HelixZkClient _zkClient;
- private ZKHelixAdmin _zkHelixAdmin;
- private ClusterSetup _clusterSetup;
- private ConfigAccessor _configAccessor;
+ private final String _msdsEndpoint;
+ private volatile RealmAwareZkClient _zkClient;
+
+ private volatile ZKHelixAdmin _zkHelixAdmin;
+ private volatile ClusterSetup _clusterSetup;
+ private volatile ConfigAccessor _configAccessor;
// A lazily-initialized base data accessor that reads/writes byte array to ZK
// TODO: Only read (deserialize) is supported at this time. This baseDataAccessor should support write (serialize) as needs arise
private volatile ZkBaseDataAccessor<byte[]> _byteArrayZkBaseDataAccessor;
@@ -55,76 +74,149 @@ public class ServerContext {
private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
// 1 Cluster name will correspond to 1 task driver
private final Map<String, TaskDriver> _taskDriverPool;
+
+ /**
+ * Multi-ZK support
+ */
private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
+ // Create a dedicated ZkClient for listening to data changes in routing data
+ private RealmAwareZkClient _zkClientForListener;
public ServerContext(String zkAddr) {
+ this(zkAddr, null);
+ }
+
+ /**
+ * Initializes a ServerContext for this namespace.
+ * @param zkAddr routing ZK address (on multi-zk mode)
+ * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS.
+ */
+ public ServerContext(String zkAddr, String msdsEndpoint) {
_zkAddr = zkAddr;
+ _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode
// We should NOT initiate _zkClient and anything that depends on _zkClient in
// constructor, as it is reasonable to start up HelixRestServer first and then
// ZooKeeper. In this case, initializing _zkClient will fail and HelixRestServer
// cannot be started correctly.
- _helixDataAccessorPool = new HashMap<>();
- _taskDriverPool = new HashMap<>();
+ _helixDataAccessorPool = new ConcurrentHashMap<>();
+ _taskDriverPool = new ConcurrentHashMap<>();
+
// Initialize the singleton ZkMetadataStoreDirectory instance to allow it to be closed later
_zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance();
}
- public HelixZkClient getHelixZkClient() {
+ public RealmAwareZkClient getRealmAwareZkClient() {
if (_zkClient == null) {
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig.setZkSerializer(new ZNRecordSerializer());
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+ synchronized (this) {
+ if (_zkClient == null) {
+ // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+ if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!",
+ _zkAddr);
+ try {
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ // If MSDS endpoint is set for this namespace, use that instead.
+ if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+ connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint);
+ }
+ _zkClient = new FederatedZkClient(connectionConfigBuilder.build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+
+ // Make sure the ServerContext is subscribed to routing data change so that it knows
+ // when to reset ZkClient and Helix APIs
+ if (_zkClientForListener == null) {
+ _zkClientForListener = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
+ new HelixZkClient.ZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+ }
+ // Refresh data subscription
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create FederatedZkClient!", e);
+ }
+ } else {
+ // If multi ZK config is not set, just connect to the ZK address given
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
+ }
+ }
+ }
}
return _zkClient;
}
@Deprecated
public ZkClient getZkClient() {
- return (ZkClient) getHelixZkClient();
+ return (ZkClient) getRealmAwareZkClient();
}
public HelixAdmin getHelixAdmin() {
if (_zkHelixAdmin == null) {
- _zkHelixAdmin = new ZKHelixAdmin(getHelixZkClient());
+ synchronized (this) {
+ if (_zkHelixAdmin == null) {
+ _zkHelixAdmin = new ZKHelixAdmin(getRealmAwareZkClient());
+ }
+ }
}
return _zkHelixAdmin;
}
public ClusterSetup getClusterSetup() {
if (_clusterSetup == null) {
- _clusterSetup = new ClusterSetup(getHelixZkClient(), getHelixAdmin());
+ synchronized (this) {
+ if (_clusterSetup == null) {
+ _clusterSetup = new ClusterSetup(getRealmAwareZkClient(), getHelixAdmin());
+ }
+ }
}
return _clusterSetup;
}
public TaskDriver getTaskDriver(String clusterName) {
- synchronized (_taskDriverPool) {
- if (!_taskDriverPool.containsKey(clusterName)) {
- _taskDriverPool.put(clusterName, new TaskDriver(getHelixZkClient(), clusterName));
+ TaskDriver taskDriver = _taskDriverPool.get(clusterName);
+ if (taskDriver == null) {
+ synchronized (this) {
+ if (!_taskDriverPool.containsKey(clusterName)) {
+ _taskDriverPool.put(clusterName, new TaskDriver(getRealmAwareZkClient(), clusterName));
+ }
+ taskDriver = _taskDriverPool.get(clusterName);
}
- return _taskDriverPool.get(clusterName);
}
+ return taskDriver;
}
public ConfigAccessor getConfigAccessor() {
if (_configAccessor == null) {
- _configAccessor = new ConfigAccessor(getHelixZkClient());
+ synchronized (this) {
+ if (_configAccessor == null) {
+ _configAccessor = new ConfigAccessor(getRealmAwareZkClient());
+ }
+ }
}
return _configAccessor;
}
- public HelixDataAccessor getDataAccssor(String clusterName) {
- synchronized (_helixDataAccessorPool) {
- if (!_helixDataAccessorPool.containsKey(clusterName)) {
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
- new ZkBaseDataAccessor<>(getHelixZkClient());
- _helixDataAccessorPool.put(clusterName,
- new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
+ public HelixDataAccessor getDataAccessor(String clusterName) {
+ HelixDataAccessor dataAccessor = _helixDataAccessorPool.get(clusterName);
+ if (dataAccessor == null) {
+ synchronized (this) {
+ if (!_helixDataAccessorPool.containsKey(clusterName)) {
+ ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
+ new ZkBaseDataAccessor<>(getRealmAwareZkClient());
+ _helixDataAccessorPool.put(clusterName,
+ new ZKHelixDataAccessor(clusterName, InstanceType.ADMINISTRATOR, baseDataAccessor));
+ }
+ dataAccessor = _helixDataAccessorPool.get(clusterName);
}
- return _helixDataAccessorPool.get(clusterName);
}
+ return dataAccessor;
}
/**
@@ -132,30 +224,26 @@ public class ServerContext {
* @return
*/
public ZkBaseDataAccessor<byte[]> getByteArrayZkBaseDataAccessor() {
- ZkBaseDataAccessor<byte[]> byteArrayZkBaseDataAccessor = _byteArrayZkBaseDataAccessor;
- if (byteArrayZkBaseDataAccessor != null) { // First check (no locking)
- return byteArrayZkBaseDataAccessor;
- }
+ if (_byteArrayZkBaseDataAccessor == null) {
+ synchronized (this) {
+ if (_byteArrayZkBaseDataAccessor == null) {
- synchronized (this) {
- if (_byteArrayZkBaseDataAccessor == null) { // Second check (with locking)
- _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() {
- @Override
- public byte[] serialize(Object o)
- throws ZkMarshallingError {
- // TODO: Support serialize for write methods if necessary
- throw new UnsupportedOperationException("serialize() is not supported.");
- }
+ _byteArrayZkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkAddr, new ZkSerializer() {
+ @Override
+ public byte[] serialize(Object o) throws ZkMarshallingError {
+ // TODO: Support serialize for write methods if necessary
+ throw new UnsupportedOperationException("serialize() is not supported.");
+ }
- @Override
- public Object deserialize(byte[] bytes)
- throws ZkMarshallingError {
- return bytes;
- }
- });
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ return bytes;
+ }
+ });
+ }
}
- return _byteArrayZkBaseDataAccessor;
}
+ return _byteArrayZkBaseDataAccessor;
}
public void close() {
@@ -165,5 +253,96 @@ public class ServerContext {
if (_zkMetadataStoreDirectory != null) {
_zkMetadataStoreDirectory.close();
}
+ if (_zkClientForListener != null) {
+ _zkClientForListener.close();
+ }
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) {
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ // Resubscribe
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ resetZkResources();
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) {
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ resetZkResources();
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) {
+ // NOP because this is covered by handleChildChange()
+ }
+
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) {
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ // Resubscribe
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ resetZkResources();
+ }
+
+ @Override
+ public void handleNewSession(String sessionId) {
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ // Resubscribe
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ resetZkResources();
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) {
+ if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ return;
+ }
+ // Resubscribe
+ _zkClientForListener.unsubscribeAll();
+ _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ resetZkResources();
+ }
+
+ /**
+ * Resets all internally cached routing data by closing and nullifying the ZkClient and Helix APIs.
+ * This is okay because routing data update should be infrequent.
+ */
+ private void resetZkResources() {
+ synchronized (this) {
+ LOG.info("ServerContext: Resetting ZK resources due to routing data change! Routing ZK: {}",
+ _zkAddr);
+ try {
+ // Reset HttpRoutingDataReader's cache
+ HttpRoutingDataReader.reset();
+ // All Helix APIs will be closed implicitly because ZkClient is closed
+ if (_zkClient != null && !_zkClient.isClosed()) {
+ _zkClient.close();
+ }
+ if (_byteArrayZkBaseDataAccessor != null) {
+ _byteArrayZkBaseDataAccessor.close();
+ }
+ _zkClient = null;
+ _zkHelixAdmin = null;
+ _clusterSetup = null;
+ _configAccessor = null;
+ _byteArrayZkBaseDataAccessor = null;
+ _helixDataAccessorPool.clear();
+ _taskDriverPool.clear();
+ } catch (Exception e) {
+ LOG.error("Failed to reset ZkClient and Helix APIs in ServerContext!", e);
+ }
+ }
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index f1bb583..487316b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -24,15 +24,15 @@ import java.io.IOException;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.server.ServerContext;
import org.apache.helix.rest.server.resources.AbstractResource;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
/**
@@ -42,14 +42,14 @@ import org.apache.helix.tools.ClusterSetup;
*/
public class AbstractHelixResource extends AbstractResource {
- public HelixZkClient getHelixZkClient() {
+ public RealmAwareZkClient getRealmAwareZkClient() {
ServerContext serverContext = getServerContext();
- return serverContext.getHelixZkClient();
+ return serverContext.getRealmAwareZkClient();
}
@Deprecated
public ZkClient getZkClient() {
- return (ZkClient) getHelixZkClient();
+ return (ZkClient) getRealmAwareZkClient();
}
public HelixAdmin getHelixAdmin() {
@@ -74,7 +74,7 @@ public class AbstractHelixResource extends AbstractResource {
public HelixDataAccessor getDataAccssor(String clusterName) {
ServerContext serverContext = getServerContext();
- return serverContext.getDataAccssor(clusterName);
+ return serverContext.getDataAccessor(clusterName);
}
protected ZkBaseDataAccessor<byte[]> getByteArrayDataAccessor() {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
index 6beeb2c..4ca775c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java
@@ -44,10 +44,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.model.RESTConfig;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ControllerHistory;
@@ -55,12 +52,15 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
+import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -472,7 +472,7 @@ public class ClusterAccessor extends AbstractHelixResource {
LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
return badRequest("Input is not a valid ZNRecord!");
}
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
String path = PropertyPathBuilder.stateModelDef(clusterId);
try {
ZKUtil.createChildren(zkClient, path, record);
@@ -656,7 +656,7 @@ public class ClusterAccessor extends AbstractHelixResource {
}
private boolean doesClusterExist(String cluster) {
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
return ZKUtil.isClusterSetup(cluster, zkClient);
}
@@ -664,7 +664,7 @@ public class ClusterAccessor extends AbstractHelixResource {
@Path("{clusterId}/cloudconfig")
public Response addCloudConfig(@PathParam("clusterId") String clusterId, String content) {
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
return notFound("Cluster is not properly setup!");
}
@@ -696,7 +696,7 @@ public class ClusterAccessor extends AbstractHelixResource {
@Path("{clusterId}/cloudconfig")
public Response getCloudConfig(@PathParam("clusterId") String clusterId) {
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
return notFound();
}
@@ -724,7 +724,7 @@ public class ClusterAccessor extends AbstractHelixResource {
public Response updateCloudConfig(@PathParam("clusterId") String clusterId,
@QueryParam("command") String commandStr, String content) {
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
return notFound();
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
index ca2189e..b8c7c38 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAccessor.java
@@ -41,18 +41,18 @@ import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.codehaus.jackson.type.TypeReference;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class ResourceAccessor extends AbstractHelixResource {
ObjectNode root = JsonNodeFactory.instance.objectNode();
root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
@@ -109,7 +109,7 @@ public class ResourceAccessor extends AbstractHelixResource {
@Path("health")
public Response getResourceHealth(@PathParam("clusterId") String clusterId) {
- HelixZkClient zkClient = getHelixZkClient();
+ RealmAwareZkClient zkClient = getRealmAwareZkClient();
List<String> resourcesInIdealState =
zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java
new file mode 100644
index 0000000..0babf05
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/integration/TestRoutingDataUpdate.java
@@ -0,0 +1,176 @@
+package org.apache.helix.rest.metadatastore.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * TestRoutingDataUpdate tests that Helix REST server's ServerContext gets a proper update whenever
+ * there is change in the routing data.
+ */
+public class TestRoutingDataUpdate extends AbstractTestClass {
+ private static final String CLUSTER_0_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-0";
+ private static final String CLUSTER_1_SHARDING_KEY = "/TestRoutingDataUpdate-cluster-1";
+ private final Map<String, List<String>> _routingData = new HashMap<>();
+
+ @BeforeClass
+ public void beforeClass() {
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY,
+ getBaseUri().getHost());
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY,
+ Integer.toString(getBaseUri().getPort()));
+
+ // Set the multi-zk config
+ System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+ // Set the MSDS address
+ String msdsEndpoint = getBaseUri().toString();
+ System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint);
+
+ // Restart Helix Rest server to get a fresh ServerContext created
+ restartRestServer();
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // Clear all property
+ System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_HOSTNAME_KEY);
+ System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_PORT_KEY);
+ System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+
+ restartRestServer();
+ }
+
+ @Test
+ public void testRoutingDataUpdate() throws Exception {
+ // Set up routing data
+ _routingData.put(ZK_ADDR, Arrays.asList(CLUSTER_0_SHARDING_KEY, CLUSTER_1_SHARDING_KEY));
+ _routingData.put(_zkAddrTestNS, new ArrayList<>());
+ String routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData);
+ put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+ Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.CREATED.getStatusCode());
+
+ // Need to wait so that ServerContext processes the callback
+ // TODO: Think of another way to wait -
+ // this is only used because of the nature of the testing environment
+ // in production, the server might return a 500 if a http call comes in before callbacks get
+ // processed fully
+ Thread.sleep(500L);
+
+ // Create the first cluster using Helix REST API via ClusterAccessor
+ put("/clusters" + CLUSTER_0_SHARDING_KEY, null,
+ Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.CREATED.getStatusCode());
+ // Check that the first cluster is created in the first ZK as designated by routing data
+ Assert.assertTrue(_gZkClient.exists(CLUSTER_0_SHARDING_KEY));
+ Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_0_SHARDING_KEY));
+
+ // Change the routing data mapping so that CLUSTER_1 points to the second ZK
+ _routingData.clear();
+ _routingData.put(ZK_ADDR, Collections.singletonList(CLUSTER_0_SHARDING_KEY));
+ _routingData.put(_zkAddrTestNS, Collections.singletonList(CLUSTER_1_SHARDING_KEY));
+ routingDataString = OBJECT_MAPPER.writeValueAsString(_routingData);
+ put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+ Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.CREATED.getStatusCode());
+
+ // Need to wait so that ServerContext processes the callback
+ // TODO: Think of another way to wait -
+ // this is only used because of the nature of the testing environment
+ // in production, the server might return a 500 if a http call comes in before callbacks get
+ // processed fully
+ Thread.sleep(500L);
+
+ // Create the second cluster using Helix REST API via ClusterAccessor
+ put("/clusters" + CLUSTER_1_SHARDING_KEY, null,
+ Entity.entity("", MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.CREATED.getStatusCode());
+ // Check that the second cluster is created in the second ZK as designated by routing data
+ Assert.assertTrue(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY));
+ Assert.assertFalse(_gZkClient.exists(CLUSTER_1_SHARDING_KEY));
+
+ // Remove all routing data
+ put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity
+ .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()),
+ MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode());
+
+ // Need to wait so that ServerContext processes the callback
+ // TODO: Think of another way to wait -
+ // this is only used because of the nature of the testing environment
+ // in production, the server might return a 500 if a http call comes in before callbacks get
+ // processed fully
+ Thread.sleep(500L);
+
+ // Delete clusters - both should fail because routing data don't have these clusters
+ delete("/clusters" + CLUSTER_0_SHARDING_KEY,
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+ delete("/clusters" + CLUSTER_1_SHARDING_KEY,
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
+
+ // Set the routing data again
+ put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null,
+ Entity.entity(routingDataString, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.CREATED.getStatusCode());
+
+ // Need to wait so that ServerContext processes the callback
+ // TODO: Think of another way to wait -
+ // this is only used because of the nature of the testing environment
+ // in production, the server might return a 500 if a http call comes in before callbacks get
+ // processed fully
+ Thread.sleep(500L);
+
+ // Attempt deletion again - now they should succeed
+ delete("/clusters" + CLUSTER_0_SHARDING_KEY, Response.Status.OK.getStatusCode());
+ delete("/clusters" + CLUSTER_1_SHARDING_KEY, Response.Status.OK.getStatusCode());
+
+ // Double-verify using ZkClients
+ Assert.assertFalse(_gZkClientTestNS.exists(CLUSTER_1_SHARDING_KEY));
+ Assert.assertFalse(_gZkClient.exists(CLUSTER_0_SHARDING_KEY));
+
+ // Remove all routing data
+ put(MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT, null, Entity
+ .entity(OBJECT_MAPPER.writeValueAsString(Collections.emptyMap()),
+ MediaType.APPLICATION_JSON_TYPE), Response.Status.CREATED.getStatusCode());
+ }
+
+ private void restartRestServer() {
+ if (_helixRestServer != null) {
+ _helixRestServer.shutdown();
+ }
+ _helixRestServer = startRestServer();
+ }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index c5ffd41..0fee10d 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -189,21 +189,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
@Override
public void start() {
if (_helixRestServer == null) {
- // Create namespace manifest map
- List<HelixRestNamespace> namespaces = new ArrayList<>();
- // Add test namespace
- namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
- HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
- // Add default namesapce
- namespaces.add(new HelixRestNamespace(ZK_ADDR));
- try {
- _helixRestServer =
- new HelixRestServer(namespaces, baseUri.getPort(), baseUri.getPath(),
- Collections.singletonList(_auditLogger));
- _helixRestServer.start();
- } catch (Exception ex) {
- throw new TestContainerException(ex);
- }
+ _helixRestServer = startRestServer();
}
}
@@ -584,4 +570,28 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
_clusters.add(STOPPABLE_CLUSTER);
_workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3));
}
+
+ /**
+ * Starts a HelixRestServer for the test suite.
+ * @return
+ */
+ protected HelixRestServer startRestServer() {
+ // Create namespace manifest map
+ List<HelixRestNamespace> namespaces = new ArrayList<>();
+ // Add test namespace
+ namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
+ HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
+ // Add default namesapce
+ namespaces.add(new HelixRestNamespace(ZK_ADDR));
+ HelixRestServer server;
+ try {
+ server =
+ new HelixRestServer(namespaces, getBaseUri().getPort(), getBaseUri().getPath(),
+ Collections.singletonList(_auditLogger));
+ server.start();
+ } catch (Exception ex) {
+ throw new TestContainerException(ex);
+ }
+ return server;
+ }
}
diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
index 41d5011..6cceb50 100644
--- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
+++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/constant/MetadataStoreRoutingConstants.java
@@ -22,6 +22,8 @@ package org.apache.helix.msdcommon.constant;
public class MetadataStoreRoutingConstants {
public static final String ROUTING_DATA_PATH = "/METADATA_STORE_ROUTING_DATA";
+ public static final String ROUTING_ZK_ADDRESS_KEY = "ROUTING_ZK_ADDRESS";
+
// For ZK only
public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
diff --git a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
index 472b3d9..ab8258d 100644
--- a/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
+++ b/metadata-store-directory-common/src/main/java/org/apache/helix/msdcommon/util/ZkValidationUtil.java
@@ -27,12 +27,13 @@ public class ZkValidationUtil {
* /
* /abc
* /abc/abc/abc/abc
+ * /abc/localhost:1234
* Invalid matches:
* null or empty string
* /abc/
* /abc/abc/abc/abc/
**/
public static boolean isPathValid(String path) {
- return path.matches("^/|(/[\\w-]+)+$");
+ return path.matches("^/|(/[\\w?:-]+)+$");
}
}
diff --git a/pom.xml b/pom.xml
index e8c9bd3..60f8d87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -630,40 +630,6 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version>
- <executions>
- <!--
- "executions" enables multiple runs of integration test suites. This is to enable two
- runs: 1. run in a single-ZK environment, 2. run in a multi-ZK environment.
- "multiZk" is the config accessible via Systems.Properties so that the two runs could be
- differentiated.
- -->
- <execution>
- <goals>
- <goal>test</goal>
- </goals>
- <id>default-test</id>
- <phase>test</phase>
- <configuration>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- <skipAfterFailureCount>10</skipAfterFailureCount>
- </configuration>
- </execution>
- <execution>
- <goals>
- <goal>test</goal>
- </goals>
- <id>multi-zk</id>
- <phase>test</phase>
- <configuration>
- <systemPropertyVariables>
- <multiZk>true</multiZk>
- <numZk>3</numZk>
- </systemPropertyVariables>
- <rerunFailingTestsCount>3</rerunFailingTestsCount>
- <skipAfterFailureCount>10</skipAfterFailureCount>
- </configuration>
- </execution>
- </executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index 0e461b7..ee8c8e3 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
@@ -579,4 +580,19 @@ public interface RealmAwareZkClient {
.setConnectInitTimeout(_connectInitTimeout);
}
}
+
+ /**
+ * Subscribes to the routing data paths using the provided ZkClient.
+ * Note: this method assumes that the routing data path has already been created.
+ * @param childListener
+ * @param dataListener
+ */
+ default void subscribeRoutingDataChanges(IZkChildListener childListener,
+ IZkDataListener dataListener) {
+ subscribeChildChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, childListener);
+ for (String child : getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+ subscribeDataChanges(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + child,
+ dataListener);
+ }
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
index f2f907a..b214cc4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/HttpRoutingDataReader.java
@@ -87,7 +87,7 @@ public class HttpRoutingDataReader {
synchronized (HttpRoutingDataReader.class) {
rawRoutingData = _rawRoutingDataMap.get(msdsEndpoint);
if (rawRoutingData == null) {
- String routingDataJson = getAllRoutingData();
+ String routingDataJson = getAllRoutingData(msdsEndpoint);
// Update the reference if reading routingData over HTTP is successful
rawRoutingData = parseRoutingData(routingDataJson);
_rawRoutingDataMap.put(msdsEndpoint, rawRoutingData);
@@ -136,16 +136,24 @@ public class HttpRoutingDataReader {
}
/**
+ * Clears the statically-cached routing data in HttpRoutingDataReader.
+ */
+ public static void reset() {
+ _rawRoutingDataMap.clear();
+ _metadataStoreRoutingDataMap.clear();
+ }
+
+ /**
* Makes an HTTP call to fetch all routing data.
* @return
* @throws IOException
*/
- private static String getAllRoutingData() throws IOException {
+ private static String getAllRoutingData(String msdsEndpoint) throws IOException {
// Note that MSDS_ENDPOINT should provide high-availability - it risks becoming a single point
// of failure if it's backed by a single IP address/host
// Retry count is 3 by default.
HttpGet requestAllData = new HttpGet(
- SYSTEM_MSDS_ENDPOINT + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT);
+ msdsEndpoint + MetadataStoreRoutingConstants.MSDS_GET_ALL_ROUTING_DATA_ENDPOINT);
// Define timeout configs
RequestConfig config = RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT_IN_MS)
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index 900c79f..acb2299 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -43,7 +43,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
@BeforeClass
public void beforeClass() throws IOException, InvalidRoutingDataException {
- // Create a mock MSDS so that HttpRoudingDataReader could fetch the routing data
+ // Create a mock MSDS so that HttpRoutingDataReader could fetch the routing data
if (_msdsServer == null) {
// Do not create again if Mock MSDS server has already been created by other tests
_msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE,