You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/09 21:47:41 UTC

[helix] branch master updated: Federated Zk Client Multi implementation for Zk transactional support.

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7738391 Federated Zk Client Multi implementation for Zk transactional support.
fb7738391 is described below

commit fb773839117ee0689451fab7c8b949115f555100
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Mon Jan 9 22:47:35 2023 +0100

    Federated Zk Client Multi implementation for Zk transactional support.
    
    Federated Zk Client Multi implementation for Zk transactional support and Test Cases
    Co-authored-by: mapeng <ma...@linkedin.com>
---
 .../integration/multizk/TestMultiInMultiZk.java    |   7 +-
 .../multizk/TestMultiZkHelixJavaApis.java          |   8 +-
 .../zookeeper/impl/client/FederatedZkClient.java   |  28 +++-
 .../impl/client/RealmAwareZkClientTestBase.java    | 143 ++++++++++++++++++++-
 .../impl/client/TestFederatedZkClient.java         |  24 ++--
 5 files changed, 183 insertions(+), 27 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
index 03d6026ad..697dda3ac 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiInMultiZk.java
@@ -87,9 +87,10 @@ public class TestMultiInMultiZk extends MultiZkTestBase {
         try {
             //Execute transactional support on operations and verify they were run
             _zkClient.multi(ops);
-            Assert.fail("Should have thrown an exception. Multi not supported");
-        } catch (UnsupportedOperationException ex) {
-            Assert.assertTrue(ex.getMessage().startsWith("Session-aware operation is not supported by FederatedZkClient."));
+            Assert.fail("Should have thrown an exception. Cannot run multi on ops of different servers.");
+        } catch (IllegalArgumentException e) {
+            boolean pathExists = _zkClient.exists("/" + CLUSTER_LIST.get(0) + "/test");
+            Assert.assertFalse(pathExists, "Path should not have been created.");
         }
         System.out.println("END " + _className + "_" + methodName + " at " + new Date(System.currentTimeMillis()));
     }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 7756296c7..3bc0cf6d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -32,7 +32,13 @@ import java.util.Set;
 import java.util.Date;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.helix.*;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 5ef808267..16baf1a9b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,12 +19,11 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -477,10 +476,27 @@ public class FederatedZkClient implements RealmAwareZkClient {
     return getZkClient(path).getCreationTime(path);
   }
 
+  /**
+   * Executes ZkMulti on operations that are connected to the same Zk server.
+   * Will throw exception if any operation's server connection is different.
+   * @param ops
+   * @return
+   * @throws IllegalArgumentException
+   */
   @Override
   public List<OpResult> multi(Iterable<Op> ops) {
-    throwUnsupportedOperationException();
-    return null;
+    if (ops == null) {
+      throw new NullPointerException("ops must not be null.");
+    }
+    boolean anyDifferent = StreamSupport.stream(ops.spliterator(), false)
+            .map(op -> getZkRealm(op.getPath()))
+            .anyMatch(s -> !s.equals(getZkRealm(ops.iterator().next().getPath())));
+
+    if (anyDifferent) {
+      throw new IllegalArgumentException("Cannot execute multi on ops of different realms!");
+    }
+    // No different zk realms so call multi on the realm of the first op
+    return getZkClient(ops.iterator().next().getPath()).multi(ops);
   }
 
   @Override
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index 82bec82e5..49bb78775 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -20,6 +20,8 @@ package org.apache.helix.zookeeper.impl.client;
  */
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
@@ -28,8 +30,14 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory;
 import org.apache.helix.zookeeper.constant.TestConstants;
 import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 
 public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
@@ -42,6 +50,7 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
   protected static final String MSDS_HOSTNAME = "localhost";
   protected static final int MSDS_PORT = 19910;
   protected static final String MSDS_NAMESPACE = "test";
+  protected static String PARENT_PATH = ZK_SHARDING_KEY_PREFIX + "/RealmAwareZkClient";
   protected RealmAwareZkClient _realmAwareZkClient;
   protected RealmAwareZkClientFactory _realmAwareZkClientFactory;
 
@@ -51,13 +60,13 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
     if (_msdsServer == null) {
       // Do not create again if Mock MSDS server has already been created by other tests
       _msdsServer = new MockMetadataStoreDirectoryServer(MSDS_HOSTNAME, MSDS_PORT, MSDS_NAMESPACE,
-          TestConstants.FAKE_ROUTING_DATA);
+              TestConstants.FAKE_ROUTING_DATA);
       _msdsServer.startServer();
     }
 
     // Register the MSDS endpoint as a System variable
     String msdsEndpoint =
-        "http://" + MSDS_HOSTNAME + ":" + MSDS_PORT + "/admin/v2/namespaces/" + MSDS_NAMESPACE;
+            "http://" + MSDS_HOSTNAME + ":" + MSDS_PORT + "/admin/v2/namespaces/" + MSDS_NAMESPACE;
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, msdsEndpoint);
   }
 
@@ -67,4 +76,132 @@ public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
       _msdsServer.stopServer();
     }
   }
-}
+  /**
+   * Initialize requirement for testing multi support.
+   */
+  @Test
+  public void testMultiSetup() throws InvalidRoutingDataException {
+    // Create a connection config with a valid sharding key
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder builder =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+            builder.setZkRealmShardingKey(ZK_SHARDING_KEY_PREFIX).build();
+
+    _realmAwareZkClient = new FederatedZkClient(connectionConfig,
+              new RealmAwareZkClient.RealmAwareZkClientConfig());
+  }
+
+  /**
+   * Test that zk multi works for create.
+   */
+  @Test(dependsOnMethods = "testMultiSetup")
+  public void testMultiCreate() {
+    String test_name = "/test_multi_create";
+
+    //Create Nodes
+    List<Op> ops = Arrays.asList(
+            Op.create(PARENT_PATH, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(PARENT_PATH + test_name, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+    //Execute transactional support on operations and verify they were run
+    List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+    Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+    Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+
+    //Verify that the znodes were created
+    Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+    Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path has not been created.");
+
+    cleanup();
+  }
+
+  /**
+   * Multi should be an all or nothing transaction. Creating correct
+   * paths and a singular bad one should all fail.
+   */
+  @Test(dependsOnMethods = "testMultiCreate")
+  public void testMultiFail() {
+    String test_name = "/test_multi_fail";
+    //Create Nodes
+    List<Op> ops = Arrays.asList(
+            Op.create(PARENT_PATH, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(PARENT_PATH + test_name, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(TEST_INVALID_PATH, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+    try {
+      _realmAwareZkClient.multi(ops);
+      Assert.fail("Should have thrown an exception. Cannot run multi on incorrect path.");
+    } catch (Exception e) {
+      boolean pathExists = _realmAwareZkClient.exists(PARENT_PATH);
+      Assert.assertFalse(pathExists, "Path should not have been created.");
+
+      cleanup();
+    }
+  }
+
+  /**
+   * Test that zk multi works for delete.
+   */
+  @Test(dependsOnMethods = "testMultiFail")
+  public void testMultiDelete() {
+    String test_name = "/test_multi_delete";
+    //Create Nodes
+    List<Op> ops = Arrays.asList(
+            Op.create(PARENT_PATH, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(PARENT_PATH + test_name, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.delete(PARENT_PATH + test_name, -1));
+
+    List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+    Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+    Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+    Assert.assertTrue(opResults.get(2) instanceof OpResult.DeleteResult);
+
+    Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+    Assert.assertFalse(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path should have been removed.");
+
+    cleanup();
+  }
+
+  /**
+   * Test that zk multi works for set.
+   */
+  @Test(dependsOnMethods = "testMultiDelete")
+  public void testMultiSet() {
+    String test_name = "/test_multi_set";
+
+    List<Op> ops = Arrays.asList(
+            Op.create(PARENT_PATH, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.create(PARENT_PATH + test_name, new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+            Op.setData(PARENT_PATH + test_name, new byte[0],
+                    -1));
+
+    List<OpResult> opResults = _realmAwareZkClient.multi(ops);
+    Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+    Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+    Assert.assertTrue(opResults.get(2) instanceof OpResult.SetDataResult);
+
+    Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH), "Path has not been created.");
+    Assert.assertTrue(_realmAwareZkClient.exists(PARENT_PATH + test_name), "Path has not been created.");
+
+    cleanup();
+  }
+
+  /**
+   * Delete created paths to clean up zk for next multi test case.
+   */
+  public void cleanup() {
+    //Delete Parent path and its children
+    _realmAwareZkClient.deleteRecursively(PARENT_PATH);
+    //Verify path has been deleted
+    boolean pathExists = _realmAwareZkClient.exists(PARENT_PATH);
+    Assert.assertFalse(pathExists, "Parent Path should have been removed.");
+  }
+}
\ No newline at end of file
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
index c4bf2d2d1..ee45874b7 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -20,13 +20,12 @@ package org.apache.helix.zookeeper.impl.client;
  */
 
 import java.io.IOException;
-import java.util.NoSuchElementException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Arrays;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
@@ -42,9 +41,7 @@ import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Op;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -102,15 +99,6 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
       Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
     }
 
-    List<Op> ops = Arrays.asList(Op.create(TEST_REALM_ONE_VALID_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT), Op.delete(TEST_REALM_ONE_VALID_PATH, -1));
-    try {
-      _realmAwareZkClient.multi(ops);
-      Assert.fail("multi() should not be supported.");
-    } catch (UnsupportedOperationException ex) {
-      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
-    }
-
     try {
       _realmAwareZkClient.getSessionId();
       Assert.fail("getSessionId() should not be supported.");
@@ -660,4 +648,12 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase {
       Assert.assertEquals(ex.getMessage(), "FederatedZkClient is closed!");
     }
   }
+
+  @Override
+  public void testMultiSetup() throws InvalidRoutingDataException {
+    super.testMultiSetup();
+    if (!_realmAwareZkClient.exists(ZK_SHARDING_KEY_PREFIX)) {
+      _realmAwareZkClient.createPersistent(ZK_SHARDING_KEY_PREFIX);
+    }
+  }
 }