You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/09 21:47:41 UTC
[helix] branch master updated: Federated Zk Client Multi implementation for Zk transactional support.
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fb7738391 Federated Zk Client Multi implementation for Zk transactional support.
fb7738391 is described below
commit fb773839117ee0689451fab7c8b949115f555100
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Mon Jan 9 22:47:35 2023 +0100
Federated Zk Client Multi implementation for Zk transactional support.
Federated Zk Client Multi implementation for Zk transactional support and Test Cases
Co-authored-by: mapeng <ma...@linkedin.com>
---
.../integration/multizk/TestMultiInMultiZk.java | 7 +-
.../multizk/TestMultiZkHelixJavaApis.java | 8 +-
.../zookeeper/impl/client/FederatedZkClient.java | 28 +++-
.../impl/client/RealmAwareZkClientTestBase.java | 143 ++++++++++++++++++++-
.../impl/client/TestFederatedZkClient.java | 24 ++--
5 files changed, 183 insertions(+), 27 deletions(-)
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
index 03d6026ad..697dda3ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
@@ -87,9 +87,10 @@ public class TestMultiInMultiZk extends MultiZkTestBase {
try {
//Execute transactional support on operations and verify they were run
_zkClient.multi(ops);
- Assert.fail("Should have thrown an exception. Multi not supported");
- } catch (UnsupportedOperationException ex) {
- Assert.assertTrue(ex.getMessage().startsWith("Session-aware operation is not supported by FederatedZkClient."));
+ Assert.fail("Should have thrown an exception. Cannot run multi on ops of different servers.");
+ } catch (IllegalArgumentException e) {
+ boolean pathExists = _zkClient.exists("/" + CLUSTER_LIST.get(0) + "/test");
+ Assert.assertFalse(pathExists, "Path should not have been created.");
}
System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 7756296c7..3bc0cf6d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -32,7 +32,13 @@ import java.util.Set;
import java.util.Date;
import com.google.common.collect.ImmutableMap;
-import org.apache.helix.*;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 5ef808267..16baf1a9b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,12 +19,11 @@ package org.apache.helix.zookeeper.impl.client;
* under the License.
*/
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -477,10 +476,27 @@ public class FederatedZkClient implements RealmAwareZkClient {
return getZkClient(path).getCreationTime(path);
}
+ /**
+ * Executes ZkMulti on operations that are connected to the same Zk server.
+ * Will throw exception if any operation's server connection is different.
+ * @param ops
+ * @return
+ * @throws IllegalArgumentException
+ */
@Override
public List<OpResult> multi(Iterable<Op> ops) {
- throwUnsupportedOperationException();
- return null;
+ if (ops == null) {
+ throw new NullPointerException("ops must not be null.");
+ }
+ boolean anyDifferent = StreamSupport.stream(ops.spliterator(), false)
+ .map(op -> getZkRealm(op.getPath()))
+ .anyMatch(s -> !s.equals(getZkRealm(ops.iterator().next().getPath())));
+
+ if (anyDifferent) {
+ throw new IllegalArgumentException("Cannot execute multi on ops of different realms!");
+ }
+ // No different zk realms so call multi on the realm of the first op
+ return getZkClient(ops.iterator().next().getPath()).multi(ops);
}
@Override
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index 82bec82e5..49bb78775 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -20,6 +20,8 @@ package org.apache.helix.zookeeper.impl.client;
*/
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -28,8 +30,14 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory;
import org.apache.helix.zookeeper.constant.TestConstants;
import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
@@ -42,6 +50,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
protected static final String MSDS_HOSTNAME = "localhost";
protected static final int MSDS_PORT = 19910;
protected static final String MSDS_NAMESPACE = "test";
+ protected static String PARENT_PATH = ZK_SHARDING_KEY_PREFIX + "/RealmAwareZkClient";
protected RealmAwareZkClient _realmAwareZkClient;
protected RealmAwareZkClientFactory _realmAwareZkClientFactory;
@@ -51,13 +60,13 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
if (_msdsServer == null) {
// Do not create again if Mock MSDS server has already been created by other tests
_msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE,
- TestConstants.FAKE_ROUTING_DATA);
+ TestConstants.FAKE_ROUTING_DATA);
_msdsServer.startServer();
}
// Register the MSDS endpoint as a System variable
String msdsEndpoint =
- "http://" + MSDS_HOSTNAME + ":" + MSDS_PORT + "/admin/v2/namespaces/" + MSDS_NAMESPACE;
+ "http://" + MSDS_HOSTNAME + ":" + MSDS_PORT + "/admin/v2/namespaces/" + MSDS_NAMESPACE;
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint);
}
@@ -67,4 +76,132 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
_msdsServer.stopServer();
}
}
-}
+ /**
+ * Initialize requirement for testing multi support.
+ */
+ @Test
+ public void testMultiSetup() throws InvalidRoutingDataException {
+ // Create a connection config with a valid sharding key
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder builder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ builder.setZkRealmShardingKey(ZK_SHARDING_KEY_PREFIX).build();
+
+ _realmAwareZkClient = new FederatedZkClient(connectionConfig,
+ new RealmAwareZkClient.RealmAwareZkClientConfig());
+ }
+
+ /**
+ * Test that zk multi works for create.
+ */
+ @Test(dependsOnMethods = "testMultiSetup")
+ public void testMultiCreate() {
+ String test_name = "/test_multi_create";
+
+ //Create Nodes
+ List<Op> ops = Arrays.asList(
+ Op.create(PARENT_PATH, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(PARENT_PATH + test_name, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+ //Execute transactional support on operations and verify they were run
+ List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+ Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+ Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+
+ //Verify that the znodes were created
+ Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+ Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path has not been created.");
+
+ cleanup();
+ }
+
+ /**
+ * Multi should be an all or nothing transaction. Creating correct
+ * paths and a singular bad one should all fail.
+ */
+ @Test(dependsOnMethods = "testMultiCreate")
+ public void testMultiFail() {
+ String test_name = "/test_multi_fail";
+ //Create Nodes
+ List<Op> ops = Arrays.asList(
+ Op.create(PARENT_PATH, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(PARENT_PATH + test_name, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(TEST_INVALID_PATH, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+ try {
+ _realmAwareZkClient.multi(ops);
+ Assert.fail("Should have thrown an exception. Cannot run multi on incorrect path.");
+ } catch (Exception e) {
+ boolean pathExists = _realmAwareZkClient.exists(PARENT_PATH);
+ Assert.assertFalse(pathExists, "Path should not have been created.");
+
+ cleanup();
+ }
+ }
+
+ /**
+ * Test that zk multi works for delete.
+ */
+ @Test(dependsOnMethods = "testMultiFail")
+ public void testMultiDelete() {
+ String test_name = "/test_multi_delete";
+ //Create Nodes
+ List<Op> ops = Arrays.asList(
+ Op.create(PARENT_PATH, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(PARENT_PATH + test_name, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.delete(PARENT_PATH + test_name, -1));
+
+ List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+ Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+ Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+ Assert.assertTrue(opResults.get(2) instanceof OpResult.DeleteResult);
+
+ Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+ Assert.assertFalse(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path should have been removed.");
+
+ cleanup();
+ }
+
+ /**
+ * Test that zk multi works for set.
+ */
+ @Test(dependsOnMethods = "testMultiDelete")
+ public void testMultiSet() {
+ String test_name = "/test_multi_set";
+
+ List<Op> ops = Arrays.asList(
+ Op.create(PARENT_PATH, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(PARENT_PATH + test_name, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.setData(PARENT_PATH + test_name, new byte[0],
+ -1));
+
+ List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+ Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+ Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+ Assert.assertTrue(opResults.get(2) instanceof OpResult.SetDataResult);
+
+ Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+ Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path has not been created.");
+
+ cleanup();
+ }
+
+ /**
+ * Delete created paths to clean up zk for next multi test case.
+ */
+ public void cleanup() {
+ //Delete Parent path and its children
+ _realmAwareZkClient.deleteRecursively(PARENT_PATH);
+ //Verify path has been deleted
+ boolean pathExists = _realmAwareZkClient.exists(PARENT_PATH);
+ Assert.assertFalse(pathExists, "Parent Path should have been removed.");
+ }
+}
\ No newline at end of file
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index c4bf2d2d1..ee45874b7 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -20,13 +20,12 @@ package org.apache.helix.zookeeper.impl.client;
*/
import java.io.IOException;
-import java.util.NoSuchElementException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
-import java.util.Arrays;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
@@ -42,9 +41,7 @@ import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Op;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -102,15 +99,6 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
}
- List<Op> ops = Arrays.asList(Op.create(TEST_REALM_ONE_VALID_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT), Op.delete(TEST_REALM_ONE_VALID_PATH, -1));
- try {
- _realmAwareZkClient.multi(ops);
- Assert.fail("multi() should not be supported.");
- } catch (UnsupportedOperationException ex) {
- Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
- }
-
try {
_realmAwareZkClient.getSessionId();
Assert.fail("getSessionId() should not be supported.");
@@ -660,4 +648,12 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
Assert.assertEquals(ex.getMessage(), "FederatedZkClient is closed!");
}
}
+
+ @Override
+ public void testMultiSetup() throws InvalidRoutingDataException {
+ super.testMultiSetup();
+ if (!_realmAwareZkClient.exists(ZK_SHARDING_KEY_PREFIX)) {
+ _realmAwareZkClient.createPersistent(ZK_SHARDING_KEY_PREFIX);
+ }
+ }
}