You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:04 UTC
[helix] 09/49: Add MetadataStoreRoutingDataWriter with
DistributedLeaderElection (#727)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
commit fbeefcfb218ce08344c2cdd76b1028a6c3caa1fe
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Mon Feb 10 16:57:58 2020 -0800
Add MetadataStoreRoutingDataWriter with DistributedLeaderElection (#727)
We need a separate ZkClient-based writer that could allow users to write routing data to ZK. This diff adds such an interface, an implementation, and a distributed lock implementation that could help users to manipulate the routing data.
Changelist:
Add ZkRoutingDataWriter (+ interface)
Add ZkDistributedLock (+ interface) to guarantee that there's at most one active writer at a time (where there are multiple Helix REST deployables)
Add a test for ZkRoutingDataWriter
Integrate ZkRoutingDataWriter with ZkMetadataStoreDirectory
Add test methods to TestZkMetadataStoreDirectory
Add ZkDistributedElection to replace ZkDistributedLock (and move ZkDistributedLock to a separate PR)
---
.../metadatastore/ZkMetadataStoreDirectory.java | 62 +++--
.../MetadataStoreRoutingDataReader.java | 3 +-
.../accessor/MetadataStoreRoutingDataWriter.java | 74 ++++++
.../{ => accessor}/ZkRoutingDataReader.java | 25 +-
.../accessor/ZkRoutingDataWriter.java | 253 +++++++++++++++++++++
.../concurrency/ZkDistributedLeaderElection.java | 142 ++++++++++++
.../constant/MetadataStoreRoutingConstants.java | 3 +
.../{ => accessor}/TestZkRoutingDataReader.java | 2 +-
.../accessor/TestZkRoutingDataWriter.java | 107 +++++++++
.../helix/rest/server/AbstractTestClass.java | 9 -
10 files changed, 627 insertions(+), 53 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
index 5a88ca9..536d058 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkMetadataStoreDirectory.java
@@ -29,16 +29,11 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
-import org.apache.helix.manager.zk.client.HelixZkClient;
-import org.apache.helix.manager.zk.client.HelixZkClient.ZkClientConfig;
-import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.MetadataStoreRoutingDataWriter;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataReader;
+import org.apache.helix.rest.metadatastore.accessor.ZkRoutingDataWriter;
import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
-import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +47,7 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
// TODO: enable the line below when implementation is complete
// The following maps' keys represent the namespace
private final Map<String, MetadataStoreRoutingDataReader> _routingDataReaderMap;
+ private final Map<String, MetadataStoreRoutingDataWriter> _routingDataWriterMap;
private final Map<String, MetadataStoreRoutingData> _routingDataMap;
private final Map<String, String> _routingZkAddressMap;
// <namespace, <realm, <list of sharding keys>> mappings
@@ -68,14 +64,17 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
throw new InvalidRoutingDataException("Routing ZK Addresses given are invalid!");
}
_routingDataReaderMap = new HashMap<>();
+ _routingDataWriterMap = new HashMap<>();
_routingZkAddressMap = routingZkAddressMap;
_realmToShardingKeysMap = new ConcurrentHashMap<>();
_routingDataMap = new ConcurrentHashMap<>();
- // Create RoutingDataReaders
+ // Create RoutingDataReaders and RoutingDataWriters
for (Map.Entry<String, String> routingEntry : _routingZkAddressMap.entrySet()) {
_routingDataReaderMap.put(routingEntry.getKey(),
new ZkRoutingDataReader(routingEntry.getKey(), routingEntry.getValue(), this));
+ _routingDataWriterMap.put(routingEntry.getKey(),
+ new ZkRoutingDataWriter(routingEntry.getKey(), routingEntry.getValue()));
// Populate realmToShardingKeys with ZkRoutingDataReader
_realmToShardingKeysMap.put(routingEntry.getKey(),
@@ -132,26 +131,38 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
@Override
public boolean addMetadataStoreRealm(String namespace, String realm) {
- // TODO implement when MetadataStoreRoutingDataWriter is ready
- throw new UnsupportedOperationException();
+ if (!_routingDataWriterMap.containsKey(namespace)) {
+ throw new IllegalArgumentException(
+ "Failed to add metadata store realm: Namespace " + namespace + " is not found!");
+ }
+ return _routingDataWriterMap.get(namespace).addMetadataStoreRealm(realm);
}
@Override
public boolean deleteMetadataStoreRealm(String namespace, String realm) {
- // TODO implement when MetadataStoreRoutingDataWriter is ready
- throw new UnsupportedOperationException();
+ if (!_routingDataWriterMap.containsKey(namespace)) {
+ throw new IllegalArgumentException(
+ "Failed to delete metadata store realm: Namespace " + namespace + " is not found!");
+ }
+ return _routingDataWriterMap.get(namespace).deleteMetadataStoreRealm(realm);
}
@Override
public boolean addShardingKey(String namespace, String realm, String shardingKey) {
- // TODO implement when MetadataStoreRoutingDataWriter is ready
- throw new UnsupportedOperationException();
+ if (!_routingDataWriterMap.containsKey(namespace)) {
+ throw new IllegalArgumentException(
+ "Failed to add sharding key: Namespace " + namespace + " is not found!");
+ }
+ return _routingDataWriterMap.get(namespace).addShardingKey(realm, shardingKey);
}
@Override
public boolean deleteShardingKey(String namespace, String realm, String shardingKey) {
- // TODO implement when MetadataStoreRoutingDataWriter is ready
- throw new UnsupportedOperationException();
+ if (!_routingDataWriterMap.containsKey(namespace)) {
+ throw new IllegalArgumentException(
+ "Failed to delete sharding key: Namespace " + namespace + " is not found!");
+ }
+ return _routingDataWriterMap.get(namespace).deleteShardingKey(realm, shardingKey);
}
/**
@@ -165,20 +176,20 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
*/
@Override
public void refreshRoutingData(String namespace) {
- // Safe to ignore the callback if any of the mapping is null.
+ // Safe to ignore the callback if any of the maps are null.
// If routingDataMap is null, then it will be populated by the constructor anyway
// If routingDataMap is not null, then it's safe for the callback function to update it
- if (_routingZkAddressMap == null || _routingDataMap == null
- || _realmToShardingKeysMap == null) {
- LOG.error("Construction is not completed! ");
+ if (_routingZkAddressMap == null || _routingDataMap == null || _realmToShardingKeysMap == null
+ || _routingDataReaderMap == null || _routingDataWriterMap == null) {
+ LOG.warn(
+ "refreshRoutingData callback called before ZKMetadataStoreDirectory was fully initialized. Skipping refresh!");
return;
}
// Check if namespace exists; otherwise, return as a NOP and log it
if (!_routingZkAddressMap.containsKey(namespace)) {
- LOG.error("Failed to refresh internally-cached routing data! Namespace not found: {}",
- namespace);
- return;
+ LOG.error(
+ "Failed to refresh internally-cached routing data! Namespace not found: " + namespace);
}
try {
@@ -197,5 +208,6 @@ public class ZkMetadataStoreDirectory implements MetadataStoreDirectory, Routing
@Override
public synchronized void close() {
_routingDataReaderMap.values().forEach(MetadataStoreRoutingDataReader::close);
+ _routingDataWriterMap.values().forEach(MetadataStoreRoutingDataWriter::close);
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
similarity index 93%
rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java
rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
index 3cc9a06..f19e8ff 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/MetadataStoreRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -26,6 +26,7 @@ import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataExceptio
/**
* An interface for a DAO that fetches routing data from a source and return a key-value mapping
* that represent the said routing data.
+ * Note: Each data reader connects to a single namespace.
*/
public interface MetadataStoreRoutingDataReader {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java
new file mode 100644
index 0000000..349bbd0
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/MetadataStoreRoutingDataWriter.java
@@ -0,0 +1,74 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * An interface for a DAO that writes to the metadata store that stores routing data.
+ * Note: Each data writer connects to a single namespace.
+ */
+public interface MetadataStoreRoutingDataWriter {
+
+ /**
+ * Creates a realm. If the namespace does not exist, it creates one.
+ * @param realm
+ * @return true if successful or if the realm already exists. false otherwise.
+ */
+ boolean addMetadataStoreRealm(String realm);
+
+ /**
+ * Deletes a realm.
+ * @param realm
+ * @return true if successful or the realm or namespace does not exist. false otherwise.
+ */
+ boolean deleteMetadataStoreRealm(String realm);
+
+ /**
+ * Creates a mapping between the sharding key to the realm. If realm doesn't exist, it will be created (this call is idempotent).
+ * @param realm
+ * @param shardingKey
+ * @return false if failed
+ */
+ boolean addShardingKey(String realm, String shardingKey);
+
+ /**
+ * Deletes the mapping between the sharding key to the realm.
+ * @param realm
+ * @param shardingKey
+ * @return false if failed; true if the deletion is successful or the key does not exist.
+ */
+ boolean deleteShardingKey(String realm, String shardingKey);
+
+ /**
+ * Sets (overwrites) the routing data with the given <realm, list of sharding keys> mapping.
+ * WARNING: This overwrites all existing routing data. Use with care!
+ * @param routingData
+ * @return
+ */
+ boolean setRoutingData(Map<String, List<String>> routingData);
+
+ /**
+ * Closes any stateful resources such as connections or threads.
+ */
+ void close();
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
similarity index 92%
rename from helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
rename to helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
index 453180f..ea8c290 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/ZkRoutingDataReader.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.helix.rest.metadatastore.RoutingDataListener;
import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
import org.apache.helix.rest.metadatastore.exceptions.InvalidRoutingDataException;
import org.apache.zookeeper.Watcher;
@@ -42,10 +43,6 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
private final HelixZkClient _zkClient;
private final RoutingDataListener _routingDataListener;
- public ZkRoutingDataReader(String namespace, String zkAddress) {
- this(namespace, zkAddress, null);
- }
-
public ZkRoutingDataReader(String namespace, String zkAddress,
RoutingDataListener routingDataListener) {
if (namespace == null || namespace.isEmpty()) {
@@ -115,8 +112,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleDataChange(String s, Object o)
- throws Exception {
+ public synchronized void handleDataChange(String s, Object o) {
if (_zkClient.isClosed()) {
return;
}
@@ -124,8 +120,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleDataDeleted(String s)
- throws Exception {
+ public synchronized void handleDataDeleted(String s) {
if (_zkClient.isClosed()) {
return;
}
@@ -140,8 +135,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleChildChange(String s, List<String> list)
- throws Exception {
+ public synchronized void handleChildChange(String s, List<String> list) {
if (_zkClient.isClosed()) {
return;
}
@@ -156,8 +150,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleStateChanged(Watcher.Event.KeeperState state)
- throws Exception {
+ public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
if (_zkClient.isClosed()) {
return;
}
@@ -165,8 +158,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleNewSession(String sessionId)
- throws Exception {
+ public synchronized void handleNewSession(String sessionId) {
if (_zkClient.isClosed()) {
return;
}
@@ -174,8 +166,7 @@ public class ZkRoutingDataReader implements MetadataStoreRoutingDataReader, IZkD
}
@Override
- public synchronized void handleSessionEstablishmentError(Throwable error)
- throws Exception {
+ public synchronized void handleSessionEstablishmentError(Throwable error) {
if (_zkClient.isClosed()) {
return;
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
new file mode 100644
index 0000000..3e43202
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/accessor/ZkRoutingDataWriter.java
@@ -0,0 +1,253 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.rest.metadatastore.concurrency.ZkDistributedLeaderElection;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkRoutingDataWriter implements MetadataStoreRoutingDataWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
+
+ private final String _namespace;
+ private final HelixZkClient _zkClient;
+ private final ZkDistributedLeaderElection _leaderElection;
+
+ public ZkRoutingDataWriter(String namespace, String zkAddress) {
+ if (namespace == null || namespace.isEmpty()) {
+ throw new IllegalArgumentException("namespace cannot be null or empty!");
+ }
+ _namespace = namespace;
+ if (zkAddress == null || zkAddress.isEmpty()) {
+ throw new IllegalArgumentException("Zk address cannot be null or empty!");
+ }
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
+
+ // Ensure that ROUTING_DATA_PATH exists in ZK. If not, create
+ // create() semantic will fail if it already exists
+ try {
+ _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, true);
+ } catch (ZkNodeExistsException e) {
+ // This is okay
+ }
+
+ // Get the hostname (REST endpoint) from System property
+ // TODO: Fill in when Helix REST implementations are ready
+ ZNRecord myServerInfo = new ZNRecord("dummy hostname");
+ _leaderElection = new ZkDistributedLeaderElection(_zkClient,
+ MetadataStoreRoutingConstants.LEADER_ELECTION_ZNODE, myServerInfo);
+ }
+
+ @Override
+ public synchronized boolean addMetadataStoreRealm(String realm) {
+ if (_leaderElection.isLeader()) {
+ if (_zkClient.isClosed()) {
+ throw new IllegalStateException("ZkClient is closed!");
+ }
+ return createZkRealm(realm);
+ }
+
+ // TODO: Forward the request to leader
+ return true;
+ }
+
+ @Override
+ public synchronized boolean deleteMetadataStoreRealm(String realm) {
+ if (_leaderElection.isLeader()) {
+ if (_zkClient.isClosed()) {
+ throw new IllegalStateException("ZkClient is closed!");
+ }
+ return _zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm);
+ }
+
+ // TODO: Forward the request to leader
+ return true;
+ }
+
+ @Override
+ public synchronized boolean addShardingKey(String realm, String shardingKey) {
+ if (_leaderElection.isLeader()) {
+ if (_zkClient.isClosed()) {
+ throw new IllegalStateException("ZkClient is closed!");
+ }
+ // If the realm does not exist already, then create the realm
+ String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm;
+ if (!_zkClient.exists(realmPath)) {
+ // Create the realm
+ if (!createZkRealm(realm)) {
+ // Failed to create the realm - log and return false
+ LOG.error(
+ "Failed to add sharding key because ZkRealm creation failed! Namespace: {}, Realm: {}, Sharding key: {}",
+ _namespace, realm, shardingKey);
+ return false;
+ }
+ }
+
+ // Add the sharding key to an empty ZNRecord
+ ZNRecord znRecord;
+ try {
+ znRecord = _zkClient.readData(realmPath);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to read the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+ _namespace, realm, shardingKey, e);
+ return false;
+ }
+ znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+ Collections.singletonList(shardingKey));
+ try {
+ _zkClient.writeData(realmPath, znRecord);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to write the realm ZNRecord in addShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+ _namespace, realm, shardingKey, e);
+ return false;
+ }
+ return true;
+ }
+
+ // TODO: Forward the request to leader
+ return true;
+ }
+
+ @Override
+ public synchronized boolean deleteShardingKey(String realm, String shardingKey) {
+ if (_leaderElection.isLeader()) {
+ if (_zkClient.isClosed()) {
+ throw new IllegalStateException("ZkClient is closed!");
+ }
+ ZNRecord znRecord =
+ _zkClient.readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, true);
+ if (znRecord == null || !znRecord
+ .getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .contains(shardingKey)) {
+ // This realm does not exist or shardingKey doesn't exist. Return true!
+ return true;
+ }
+ znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .remove(shardingKey);
+ // Overwrite this ZNRecord with the sharding key removed
+ try {
+ _zkClient
+ .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to write the data back in deleteShardingKey()! Namespace: {}, Realm: {}, ShardingKey: {}",
+ _namespace, realm, shardingKey, e);
+ return false;
+ }
+ return true;
+ }
+
+ // TODO: Forward the request to leader
+ return true;
+ }
+
+ @Override
+ public synchronized boolean setRoutingData(Map<String, List<String>> routingData) {
+ if (_leaderElection.isLeader()) {
+ if (_zkClient.isClosed()) {
+ throw new IllegalStateException("ZkClient is closed!");
+ }
+ if (routingData == null) {
+ throw new IllegalArgumentException("routingData given is null!");
+ }
+
+ // Remove existing routing data
+ for (String zkRealm : _zkClient
+ .getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH)) {
+ if (!_zkClient.delete(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm)) {
+ LOG.error(
+ "Failed to delete existing routing data in setRoutingData()! Namespace: {}, Realm: {}",
+ _namespace, zkRealm);
+ return false;
+ }
+ }
+
+ // For each ZkRealm, write the given routing data to ZooKeeper
+ for (Map.Entry<String, List<String>> routingDataEntry : routingData.entrySet()) {
+ String zkRealm = routingDataEntry.getKey();
+ List<String> shardingKeyList = routingDataEntry.getValue();
+
+ ZNRecord znRecord = new ZNRecord(zkRealm);
+ znRecord
+ .setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, shardingKeyList);
+
+ String realmPath = MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm;
+ try {
+ if (!_zkClient.exists(realmPath)) {
+ _zkClient.createPersistent(realmPath);
+ }
+ _zkClient.writeData(realmPath, znRecord);
+ } catch (Exception e) {
+ LOG.error("Failed to write data in setRoutingData()! Namespace: {}, Realm: {}",
+ _namespace, zkRealm, e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // TODO: Forward the request to leader
+ return true;
+ }
+
+ @Override
+ public synchronized void close() {
+ _zkClient.close();
+ }
+
+ /**
+ * Creates a ZK realm ZNode and populates it with an empty ZNRecord if it doesn't exist already.
+ * @param realm
+ * @return
+ */
+ private boolean createZkRealm(String realm) {
+ if (_zkClient.exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm)) {
+ LOG.warn("createZkRealm() called for realm: {}, but this realm already exists! Namespace: {}",
+ realm, _namespace);
+ return true;
+ }
+ try {
+ _zkClient.createPersistent(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm);
+ _zkClient.writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm,
+ new ZNRecord(realm));
+ } catch (Exception e) {
+ LOG.error("Failed to create ZkRealm: {}, Namespace: ", realm, _namespace);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
new file mode 100644
index 0000000..c9b6bb2
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
@@ -0,0 +1,142 @@
+package org.apache.helix.rest.metadatastore.concurrency;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkDistributedLeaderElection implements IZkDataListener, IZkStateListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLeaderElection.class);
+ private static final String PREFIX = "MSDS_SERVER_";
+
+ private final HelixZkClient _zkClient;
+ private final String _basePath;
+ private final ZNRecord _participantInfo;
+ private ZNRecord _currentLeaderInfo;
+
+ private String _myEphemeralSequentialPath;
+ private volatile boolean _isLeader;
+
+ public ZkDistributedLeaderElection(HelixZkClient zkClient, String basePath,
+ ZNRecord participantInfo) {
+ synchronized (this) {
+ if (zkClient == null || zkClient.isClosed()) {
+ throw new IllegalArgumentException("ZkClient cannot be null or closed!");
+ }
+ _zkClient = zkClient;
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ if (basePath == null || basePath.isEmpty()) {
+ throw new IllegalArgumentException("lockBasePath cannot be null or empty!");
+ }
+ _basePath = basePath;
+ _participantInfo = participantInfo;
+ _isLeader = false;
+ }
+ init();
+ }
+
+ /**
+ * Create the base path if it doesn't exist and create an ephemeral sequential ZNode.
+ */
+ private void init() {
+ try {
+ _zkClient.createPersistent(_basePath, true);
+ } catch (ZkNodeExistsException e) {
+ // Okay if it exists already
+ }
+
+ // Create my ephemeral sequential node with my information
+ _myEphemeralSequentialPath = _zkClient
+ .create(_basePath + "/" + PREFIX, _participantInfo, CreateMode.EPHEMERAL_SEQUENTIAL);
+ if (_myEphemeralSequentialPath == null) {
+ throw new IllegalStateException(
+ "Unable to create ephemeral sequential node at path: " + _basePath);
+ }
+ tryAcquiringLeadership();
+ }
+
+ private void tryAcquiringLeadership() {
+ List<String> children = _zkClient.getChildren(_basePath);
+ Collections.sort(children);
+ String leaderName = children.get(0);
+ ZNRecord leaderInfo = _zkClient.readData(_basePath + "/" + leaderName, true);
+
+ String[] myNameArray = _myEphemeralSequentialPath.split("/");
+ String myName = myNameArray[myNameArray.length - 1];
+
+ if (leaderName.equals(myName)) {
+ // My turn for leadership
+ _isLeader = true;
+ _currentLeaderInfo = leaderInfo;
+ LOG.info("{} acquired leadership! Info: {}", myName, leaderInfo);
+ } else {
+ // Watch the ephemeral ZNode before me for a deletion event
+ String beforeMe = children.get(children.indexOf(myName) - 1);
+ _zkClient.subscribeDataChanges(_basePath + "/" + beforeMe, this);
+ }
+ }
+
+ public synchronized boolean isLeader() {
+ return _isLeader;
+ }
+
+ public synchronized ZNRecord getCurrentLeaderInfo() {
+ return _currentLeaderInfo;
+ }
+
+ @Override
+ public synchronized void handleStateChanged(Watcher.Event.KeeperState state) {
+ if (state == Watcher.Event.KeeperState.SyncConnected) {
+ init();
+ }
+ }
+
+ @Override
+ public void handleNewSession(String sessionId) {
+ return;
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) {
+ return;
+ }
+
+ @Override
+ public void handleDataChange(String s, Object o) {
+ return;
+ }
+
+ @Override
+ public void handleDataDeleted(String s) {
+ tryAcquiringLeadership();
+ }
+}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
index fda355b..e4240e7 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/metadatastore/constant/MetadataStoreRoutingConstants.java
@@ -24,4 +24,7 @@ public class MetadataStoreRoutingConstants {
// For ZK only
public static final String ZNRECORD_LIST_FIELD_KEY = "ZK_PATH_SHARDING_KEYS";
+
+ // Leader election ZNode for ZkRoutingDataWriter
+ public static final String LEADER_ELECTION_ZNODE = "/_ZK_ROUTING_DATA_WRITER_LEADER";
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
similarity index 99%
rename from helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
rename to helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
index 4479f68..77eb5eb 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/TestZkRoutingDataReader.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataReader.java
@@ -1,4 +1,4 @@
-package org.apache.helix.rest.metadatastore;
+package org.apache.helix.rest.metadatastore.accessor;
/*
* Licensed to the Apache Software Foundation (ASF) under one
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
new file mode 100644
index 0000000..441bf65
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/metadatastore/accessor/TestZkRoutingDataWriter.java
@@ -0,0 +1,107 @@
+package org.apache.helix.rest.metadatastore.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.rest.metadatastore.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.rest.server.AbstractTestClass;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkRoutingDataWriter extends AbstractTestClass {
+ private static final String DUMMY_NAMESPACE = "NAMESPACE";
+ private static final String DUMMY_REALM = "REALM";
+ private static final String DUMMY_SHARDING_KEY = "SHARDING_KEY";
+ private MetadataStoreRoutingDataWriter _zkRoutingDataWriter;
+
+ @BeforeClass
+ public void beforeClass() {
+ _zkRoutingDataWriter = new ZkRoutingDataWriter(DUMMY_NAMESPACE, ZK_ADDR);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _baseAccessor.remove(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, AccessOption.PERSISTENT);
+ _zkRoutingDataWriter.close();
+ }
+
+ @Test
+ public void testAddMetadataStoreRealm() {
+ _zkRoutingDataWriter.addMetadataStoreRealm(DUMMY_REALM);
+ ZNRecord znRecord = _baseAccessor
+ .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+ AccessOption.PERSISTENT);
+ Assert.assertNotNull(znRecord);
+ }
+
+ @Test(dependsOnMethods = "testAddMetadataStoreRealm")
+ public void testDeleteMetadataStoreRealm() {
+ _zkRoutingDataWriter.deleteMetadataStoreRealm(DUMMY_REALM);
+ Assert.assertFalse(_baseAccessor
+ .exists(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM,
+ AccessOption.PERSISTENT));
+ }
+
+ @Test(dependsOnMethods = "testDeleteMetadataStoreRealm")
+ public void testAddShardingKey() {
+ _zkRoutingDataWriter.addShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+ ZNRecord znRecord = _baseAccessor
+ .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+ AccessOption.PERSISTENT);
+ Assert.assertNotNull(znRecord);
+ Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .contains(DUMMY_SHARDING_KEY));
+ }
+
+ @Test(dependsOnMethods = "testAddShardingKey")
+ public void testDeleteShardingKey() {
+ _zkRoutingDataWriter.deleteShardingKey(DUMMY_REALM, DUMMY_SHARDING_KEY);
+ ZNRecord znRecord = _baseAccessor
+ .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+ AccessOption.PERSISTENT);
+ Assert.assertNotNull(znRecord);
+ Assert.assertFalse(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .contains(DUMMY_SHARDING_KEY));
+ }
+
+ @Test(dependsOnMethods = "testDeleteShardingKey")
+ public void testSetRoutingData() {
+ Map<String, List<String>> testRoutingDataMap =
+ ImmutableMap.of(DUMMY_REALM, Collections.singletonList(DUMMY_SHARDING_KEY));
+ _zkRoutingDataWriter.setRoutingData(testRoutingDataMap);
+ ZNRecord znRecord = _baseAccessor
+ .get(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + DUMMY_REALM, null,
+ AccessOption.PERSISTENT);
+ Assert.assertNotNull(znRecord);
+ Assert.assertEquals(
+ znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY).size(), 1);
+ Assert.assertTrue(znRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY)
+ .contains(DUMMY_SHARDING_KEY));
+ }
+}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index e6ecb82..c5ffd41 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -269,19 +269,10 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
_gZkClient = null;
}
- if (_zkServer != null) {
- TestHelper.stopZkServer(_zkServer);
- _zkServer = null;
- }
-
if (_gZkClientTestNS != null) {
_gZkClientTestNS.close();
_gZkClientTestNS = null;
}
- if (_zkServerTestNS != null) {
- TestHelper.stopZkServer(_zkServerTestNS);
- _zkServerTestNS = null;
- }
if (_helixRestServer != null) {
_helixRestServer.shutdown();