You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/25 18:34:02 UTC

[helix] branch metaclient updated: ZkMetaclient - implementation of TransactionOp

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new c18ed2012 ZkMetaclient - implementation of TransactionOp
c18ed2012 is described below

commit c18ed20122ea1ad7f382da5a7cd40ddf5b18bdd6
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Wed Jan 25 13:33:56 2023 -0500

    ZkMetaclient - implementation of TransactionOp
    
    1. Implementation of the transactionOp() operation method in zkMetaClient. This method enables transactional support by converting metaclient operations into zk ops and running the zk.multi() method. The results are converted back from zk opResults to metaclient opResults.
    2. Created a separate util class zkMetaClientUtil for Util functions and the logic behind the implementation of transactionOp().
---
 .../java/org/apache/helix/metaclient/api/Op.java   |   2 +-
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  57 ++----
 .../metaclient/impl/zk/util/ZkMetaClientUtil.java  | 213 +++++++++++++++++++++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  75 +++++++-
 4 files changed, 301 insertions(+), 46 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/Op.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/Op.java
index 483be92b2..4b4deffeb 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/Op.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/Op.java
@@ -24,7 +24,7 @@ package org.apache.helix.metaclient.api;
  *  version check or delete operation.
  */
 public abstract class Op {
-  enum Type {
+  public enum Type {
     CREATE,
     DELETE,
     SET,
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 631bd1a3a..4fa0ca73b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -33,26 +33,21 @@ import org.apache.helix.metaclient.api.DirectChildSubscribeResult;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
-import org.apache.helix.metaclient.constants.MetaClientBadVersionException;
 import org.apache.helix.metaclient.constants.MetaClientException;
-import org.apache.helix.metaclient.constants.MetaClientInterruptException;
-import org.apache.helix.metaclient.constants.MetaClientNoNodeException;
-import org.apache.helix.metaclient.constants.MetaClientTimeoutException;
 import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
-import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.server.EphemeralType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryMode;
+import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   private static final Logger LOG  = LoggerFactory.getLogger(ZkMetaClient.class);
@@ -140,11 +135,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     return _zkClient.readData(key, true);
   }
 
-  @Override
-  public List<OpResult> transactionOP(Iterable<Op> ops) {
-    return null;
-  }
-
   @Override
   public List<String> getDirectChildrenKeys(String key) {
     try {
@@ -338,34 +328,13 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     disconnect();
   }
 
-  private static MetaClientException translateZkExceptionToMetaclientException(ZkException e) {
-    if (e instanceof ZkNodeExistsException) {
-      return new MetaClientNoNodeException(e);
-    } else if (e instanceof ZkBadVersionException) {
-      return new MetaClientBadVersionException(e);
-    } else if (e instanceof ZkTimeoutException) {
-      return new MetaClientTimeoutException(e);
-    } else if (e instanceof ZkInterruptedException) {
-      return new MetaClientInterruptException(e);
-    } else {
-      return new MetaClientException(e);
-    }
-  }
-
-  private static EntryMode convertZkEntryMode(long ephemeralOwner) {
-    EphemeralType zkEphemeralType = EphemeralType.get(ephemeralOwner);
-    switch (zkEphemeralType) {
-      case VOID:
-        return EntryMode.PERSISTENT;
-      case CONTAINER:
-        return EntryMode.CONTAINER;
-      case NORMAL:
-        return EntryMode.EPHEMERAL;
-      // TODO: TTL is not supported now.
-      //case TTL:
-      //  return EntryMode.TTL;
-      default:
-        throw new IllegalArgumentException(zkEphemeralType + " is not supported.");
-    }
+  @Override
+  public List<OpResult> transactionOP(Iterable<Op> ops) {
+    // Convert list of MetaClient Ops to Zk Ops
+    List<org.apache.zookeeper.Op> zkOps = ZkMetaClientUtil.metaClientOpsToZkOps(ops);
+    // Execute Zk transactional support
+    List<org.apache.zookeeper.OpResult> zkResult = _zkClient.multi(zkOps);
+    // Convert list of Zk OpResults to MetaClient OpResults
+    return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult);
   }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
new file mode 100644
index 000000000..94c83d725
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -0,0 +1,213 @@
+package org.apache.helix.metaclient.impl.zk.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.constants.*;
+import org.apache.helix.zookeeper.zkclient.exception.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.EphemeralType;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.function.Function;
+
+public class ZkMetaClientUtil {
+  //TODO Implement MetaClient ACL
+  //Default ACL value until metaClient Op has ACL of its own.
+  private static final List<ACL> DEFAULT_ACL = Collections.unmodifiableList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+
+  private ZkMetaClientUtil(){
+  }
+
+  /**
+   * Helper function for transactionOp. Converts MetaClient Op's into Zk Ops to execute
+   * zk transactional support.
+   * @param ops
+   * @return
+   */
+  public static List<Op> metaClientOpsToZkOps(Iterable<org.apache.helix.metaclient.api.Op> ops) {
+    List<Op> zkOps = new ArrayList<>();
+    for (org.apache.helix.metaclient.api.Op op : ops) {
+      Function<org.apache.helix.metaclient.api.Op, Op> function = getOpMap().get(op.getType());
+      if (function != null) {
+        zkOps.add(function.apply(op));
+      } else {
+        throw new IllegalArgumentException("Op type " + op.getType().name() + " is not supported.");
+      }
+    }
+    return zkOps;
+  }
+
+  private static final class OpMapHolder {
+    static final Map<org.apache.helix.metaclient.api.Op.Type, Function<org.apache.helix.metaclient.api.Op, Op>> OPMAP = initializeOpMap();
+
+    private static Map<org.apache.helix.metaclient.api.Op.Type, Function<org.apache.helix.metaclient.api.Op, Op>> initializeOpMap() {
+      Map<org.apache.helix.metaclient.api.Op.Type, Function<org.apache.helix.metaclient.api.Op, Op>> opmap =
+          new EnumMap<>(org.apache.helix.metaclient.api.Op.Type.class);
+
+      opmap.put(org.apache.helix.metaclient.api.Op.Type.CREATE, op -> {
+        try {
+          CreateMode mode = convertMetaClientMode(((org.apache.helix.metaclient.api.Op.Create) op).getEntryMode());
+          return Op.create(op.getPath(), ((org.apache.helix.metaclient.api.Op.Create) op).getData(), DEFAULT_ACL, mode);
+        } catch (KeeperException e) {
+          throw translateZkExceptionToMetaclientException(ZkException.create(e));
+        }
+      });
+
+      opmap.put(org.apache.helix.metaclient.api.Op.Type.DELETE,
+          op -> Op.delete(op.getPath(), ((org.apache.helix.metaclient.api.Op.Delete) op).getVersion()));
+
+      opmap.put(org.apache.helix.metaclient.api.Op.Type.SET,
+          op -> Op.setData(op.getPath(),
+              ((org.apache.helix.metaclient.api.Op.Set) op).getData(), ((org.apache.helix.metaclient.api.Op.Set) op).getVersion()));
+
+      opmap.put(org.apache.helix.metaclient.api.Op.Type.CHECK,
+          op -> Op.check(op.getPath(), ((org.apache.helix.metaclient.api.Op.Check) op).getVersion()));
+
+      return opmap;
+    }
+  }
+
+  private static Map<org.apache.helix.metaclient.api.Op.Type, Function<org.apache.helix.metaclient.api.Op, Op>> getOpMap() {
+    return OpMapHolder.OPMAP;
+  }
+
+  private static CreateMode convertMetaClientMode(MetaClientInterface.EntryMode entryMode) throws KeeperException {
+    switch (entryMode) {
+      case PERSISTENT:
+        return CreateMode.PERSISTENT;
+      case EPHEMERAL:
+        return CreateMode.EPHEMERAL;
+      case CONTAINER:
+        return CreateMode.CONTAINER;
+      default:
+        throw new IllegalArgumentException(entryMode.name() + " is not a supported EntryMode.");
+    }
+  }
+
+  /**
+   * Helper function for transactionOP. Converts the result from calling zk transactional support into
+   * metaclient OpResults.
+   * @param zkResult
+   * @return
+   */
+  public static List<OpResult> zkOpResultToMetaClientOpResults(List<org.apache.zookeeper.OpResult> zkResult) {
+    List<OpResult> metaClientOpResult = new ArrayList<>();
+    for (org.apache.zookeeper.OpResult opResult : zkResult) {
+      Function<org.apache.zookeeper.OpResult, OpResult> function = getOpResultMap().get(opResult.getClass());
+      if (function != null) {
+        metaClientOpResult.add(function.apply(opResult));
+      } else {
+        throw new IllegalArgumentException("OpResult type " + opResult.getType() + "is not supported.");
+      }
+    }
+
+    return metaClientOpResult;
+  }
+
+  private static final class OpResultMapHolder {
+    static final Map<Class<? extends org.apache.zookeeper.OpResult>, Function<org.apache.zookeeper.OpResult, OpResult>> OPRESULTMAP = initializeOpResultMap();
+
+    private static Map<Class<? extends org.apache.zookeeper.OpResult>, Function<org.apache.zookeeper.OpResult, OpResult>> initializeOpResultMap() {
+      Map<Class<? extends org.apache.zookeeper.OpResult>, Function<org.apache.zookeeper.OpResult, OpResult>> opResultMap =
+          new HashMap<>();
+      opResultMap.put(org.apache.zookeeper.OpResult.CreateResult.class, opResult -> {
+        org.apache.zookeeper.OpResult.CreateResult zkOpCreateResult = (org.apache.zookeeper.OpResult.CreateResult) opResult;
+        if (opResult.getType() == 1) {
+          return new OpResult.CreateResult(zkOpCreateResult.getPath());
+        } else {
+          MetaClientInterface.Stat metaClientStat = new MetaClientInterface.Stat(convertZkEntryMode(zkOpCreateResult.getStat().getEphemeralOwner()),
+              zkOpCreateResult.getStat().getVersion());
+          return new OpResult.CreateResult(zkOpCreateResult.getPath(), metaClientStat);
+        }});
+
+      opResultMap.put(org.apache.zookeeper.OpResult.DeleteResult.class, opResult -> new OpResult.DeleteResult());
+
+      opResultMap.put(org.apache.zookeeper.OpResult.GetDataResult.class, opResult -> {
+        org.apache.zookeeper.OpResult.GetDataResult zkOpGetDataResult = (org.apache.zookeeper.OpResult.GetDataResult) opResult;
+        MetaClientInterface.Stat metaClientStat = new MetaClientInterface.Stat(convertZkEntryMode(zkOpGetDataResult.getStat().getEphemeralOwner()),
+            zkOpGetDataResult.getStat().getVersion());
+        return new OpResult.GetDataResult(zkOpGetDataResult.getData(), metaClientStat);
+      });
+
+      opResultMap.put(org.apache.zookeeper.OpResult.SetDataResult.class, opResult -> {
+        org.apache.zookeeper.OpResult.SetDataResult zkOpSetDataResult = (org.apache.zookeeper.OpResult.SetDataResult) opResult;
+        MetaClientInterface.Stat metaClientStat = new MetaClientInterface.Stat(convertZkEntryMode(zkOpSetDataResult.getStat().getEphemeralOwner()),
+            zkOpSetDataResult.getStat().getVersion());
+        return new OpResult.SetDataResult(metaClientStat);
+      });
+
+      opResultMap.put(org.apache.zookeeper.OpResult.GetChildrenResult.class, opResult -> new OpResult.GetChildrenResult(
+          ((org.apache.zookeeper.OpResult.GetChildrenResult) opResult).getChildren()));
+
+      opResultMap.put(org.apache.zookeeper.OpResult.CheckResult.class, opResult -> new OpResult.CheckResult());
+
+      opResultMap.put(org.apache.zookeeper.OpResult.ErrorResult.class, opResult -> new OpResult.ErrorResult(
+          ((org.apache.zookeeper.OpResult.ErrorResult) opResult).getErr()));
+
+      return opResultMap;
+    }
+  }
+
+  private static Map<Class<? extends org.apache.zookeeper.OpResult>, Function<org.apache.zookeeper.OpResult, OpResult>> getOpResultMap() {
+    return OpResultMapHolder.OPRESULTMAP;
+  }
+
+  public static MetaClientInterface.EntryMode convertZkEntryMode(long ephemeralOwner) {
+    EphemeralType zkEphemeralType = EphemeralType.get(ephemeralOwner);
+    switch (zkEphemeralType) {
+      case VOID:
+        return MetaClientInterface.EntryMode.PERSISTENT;
+      case CONTAINER:
+        return MetaClientInterface.EntryMode.CONTAINER;
+      case NORMAL:
+        return MetaClientInterface.EntryMode.EPHEMERAL;
+      // TODO: TTL is not supported now.
+      //case TTL:
+      //  return EntryMode.TTL;
+      default:
+        throw new IllegalArgumentException(zkEphemeralType + " is not supported.");
+    }
+  }
+
+  public static MetaClientException translateZkExceptionToMetaclientException(ZkException e) {
+    if (e instanceof ZkNodeExistsException) {
+      return new MetaClientNoNodeException(e);
+    } else if (e instanceof ZkBadVersionException) {
+      return new MetaClientBadVersionException(e);
+    } else if (e instanceof ZkTimeoutException) {
+      return new MetaClientTimeoutException(e);
+    } else if (e instanceof ZkInterruptedException) {
+      return new MetaClientInterruptException(e);
+    } else {
+      return new MetaClientException(e);
+    }
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 553ce22e0..605f37c74 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -39,8 +39,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.api.Op;
+import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -54,8 +57,13 @@ public class TestZkMetaClient {
   private static final String ZK_ADDR = "localhost:2183";
   private static final int DEFAULT_TIMEOUT_MS = 1000;
   private static final String ENTRY_STRING_VALUE = "test-value";
+  protected static final String ZK_SHARDING_KEY_PREFIX = "/sharding-key-0";
+  protected static String PARENT_PATH = ZK_SHARDING_KEY_PREFIX + "/RealmAwareZkClient";
+  protected static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c";
+
   private final Object _syncObject = new Object();
 
+
   private ZkServer _zkServer;
 
   @BeforeClass
@@ -292,7 +300,6 @@ public class TestZkMetaClient {
       Assert.assertTrue(dataExpected.get());
     }
   }
-
   private static ZkMetaClient<String> createZkMetaClient() {
     ZkMetaClientConfig config =
         new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
@@ -321,6 +328,72 @@ public class TestZkMetaClient {
     zkServer.start();
     return zkServer;
   }
+  
+  /**
+   * Test that zk multi works for zkmetaclient operations create,
+   * delete, and set.
+   */
+  @Test
+  public void testMultiOps() {
+    String test_name = "/test_multi_ops";
+
+    try(ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      zkMetaClient.create(ZK_SHARDING_KEY_PREFIX, ENTRY_STRING_VALUE);
+
+      //Create Nodes
+      List<Op> ops = Arrays.asList(
+          Op.create(PARENT_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
+          Op.create(PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
+          Op.delete(PARENT_PATH + test_name, -1),
+          Op.create(PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
+          Op.set(PARENT_PATH + test_name, new byte[0], -1));
+
+      //Execute transactional support on operations
+      List<OpResult> opResults = zkMetaClient.transactionOP(ops);
+
+      //Verify opResults types
+      Assert.assertTrue(opResults.get(0) instanceof OpResult.CreateResult);
+      Assert.assertTrue(opResults.get(1) instanceof OpResult.CreateResult);
+      Assert.assertTrue(opResults.get(2) instanceof OpResult.DeleteResult);
+      Assert.assertTrue(opResults.get(4) instanceof OpResult.SetDataResult);
+
+      //Verify paths have been created
+      MetaClientInterface.Stat entryStat = zkMetaClient.exists(PARENT_PATH + test_name);
+      Assert.assertNotNull(entryStat, "Path should have been created.");
+
+      //Cleanup
+      zkMetaClient.recursiveDelete(PARENT_PATH);
+      if (zkMetaClient.exists(PARENT_PATH) != null) {
+        Assert.fail("Parent Path should have been removed.");
+      }
+    }
+  }
+
+  /**
+   * Tests that attempts to call multi on an invalid path. Should fail.
+   * @throws KeeperException
+   */
+  @Test(dependsOnMethods = "testMultiOps")
+  public void testMultiFail() {
+    String test_name = "/test_multi_fail";
+    try(ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      //Create Nodes
+      List<Op> ops = Arrays.asList(
+          Op.create(PARENT_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
+          Op.create(PARENT_PATH + test_name, new byte[0], MetaClientInterface.EntryMode.PERSISTENT),
+          Op.create(TEST_INVALID_PATH, new byte[0], MetaClientInterface.EntryMode.PERSISTENT));
+
+      try {
+        zkMetaClient.transactionOP(ops);
+        Assert.fail("Should have thrown an exception. Cannot run multi on incorrect path.");
+      } catch (Exception e) {
+        MetaClientInterface.Stat entryStat = zkMetaClient.exists(PARENT_PATH);
+        Assert.assertNull(entryStat);
+      }
+    }
+  }
 
   private class MockDataChangeListener implements DataChangeListener {
     private final AtomicInteger _triggeredCount = new AtomicInteger(0);