You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/14 18:14:01 UTC

[helix] 06/12: Implement ZkRoutingDataReader

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d3477bed870ba7778d55ed49ea14e30a09c88cdc
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Jul 22 11:10:42 2020 -0700

    Implement ZkRoutingDataReader
    
    In order to allow certain users to use ZK as the sole routing data source, we add ZkRoutingDataReader that transiently creates a ZK connection to read the routing data from the routing ZK.
---
 .../zookeeper/routing/ZkRoutingDataReader.java     | 81 +++++++++++++++++++++-
 .../helix/zookeeper/constant/TestConstants.java    | 16 +++--
 .../zookeeper/routing/TestZkRoutingDataReader.java | 75 +++++++++++++++++++-
 3 files changed, 164 insertions(+), 8 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java
index 3db3b55..48c8604 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/ZkRoutingDataReader.java
@@ -1,4 +1,83 @@
 package org.apache.helix.zookeeper.routing;
 
-public class ZkRoutingDataReader {
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.exception.MultiZkException;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
+
+
+/**
+ * Zk-based RoutingDataReader that establishes a ZK connection to the routing ZK to fetch routing
+ * data.
+ * The reading of routing data by nature should only be performed in cases of a Helix client
+ * initialization or routing data reset. That means we do not have to maintain an active ZK
+ * connection. To minimize the number of client-side ZK connections, ZkRoutingDataReader establishes
+ * a ZK session temporarily only to read from ZK afresh and closes sessions upon read completion.
+ */
+public class ZkRoutingDataReader implements RoutingDataReader {
+
+  /**
+   * Returns a map form of metadata store routing data.
+   * The map fields stand for metadata store realm address (key), and a corresponding list of ZK
+   * path sharding keys (key).
+   * @param endpoint
+   * @return
+   */
+  @Override
+  public Map<String, List<String>> getRawRoutingData(String endpoint) {
+    ZkClient zkClient =
+        new ZkClient.Builder().setZkServer(endpoint).setZkSerializer(new ZNRecordSerializer())
+            .build();
+
+    Map<String, List<String>> routingData = new HashMap<>();
+    List<String> allRealmAddresses;
+    try {
+      allRealmAddresses = zkClient.getChildren(MetadataStoreRoutingConstants.ROUTING_DATA_PATH);
+    } catch (ZkNoNodeException e) {
+      throw new MultiZkException(
+          "Routing data directory ZNode " + MetadataStoreRoutingConstants.ROUTING_DATA_PATH
+              + " does not exist. Routing ZooKeeper address: " + endpoint);
+    }
+    if (allRealmAddresses != null) {
+      for (String realmAddress : allRealmAddresses) {
+        ZNRecord record = zkClient
+            .readData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realmAddress, true);
+        if (record != null) {
+          List<String> shardingKeys =
+              record.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY);
+          routingData
+              .put(realmAddress, shardingKeys != null ? shardingKeys : Collections.emptyList());
+        }
+      }
+    }
+
+    zkClient.close();
+    return routingData;
+  }
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java
index a217245..d6c6713 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/constant/TestConstants.java
@@ -20,6 +20,7 @@ package org.apache.helix.zookeeper.constant;
  */
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
@@ -34,12 +35,15 @@ public class TestConstants {
   public static final String ZK_PREFIX = "localhost:";
   public static final int ZK_START_PORT = 2127;
 
+  public static final List<String> TEST_KEY_LIST_1 =
+      ImmutableList.of("/sharding-key-0", "/sharding-key-1", "/sharding-key-2");
+  public static final List<String> TEST_KEY_LIST_2 =
+      ImmutableList.of("/sharding-key-3", "/sharding-key-4", "/sharding-key-5");
+  public static final List<String> TEST_KEY_LIST_3 =
+      ImmutableList.of("/sharding-key-6", "/sharding-key-7", "/sharding-key-8");
+
   // Based on the ZK hostname constants, construct a set of fake routing data mappings
   public static final Map<String, Collection<String>> FAKE_ROUTING_DATA = ImmutableMap
-      .of(ZK_PREFIX + ZK_START_PORT,
-          ImmutableList.of("/sharding-key-0", "/sharding-key-1", "/sharding-key-2"),
-          ZK_PREFIX + (ZK_START_PORT + 1),
-          ImmutableList.of("/sharding-key-3", "/sharding-key-4", "/sharding-key-5"),
-          ZK_PREFIX + (ZK_START_PORT + 2),
-          ImmutableList.of("/sharding-key-6", "/sharding-key-7", "/sharding-key-8"));
+      .of(ZK_PREFIX + ZK_START_PORT, TEST_KEY_LIST_1, ZK_PREFIX + (ZK_START_PORT + 1),
+          TEST_KEY_LIST_2, ZK_PREFIX + (ZK_START_PORT + 2), TEST_KEY_LIST_3);
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java
index 68f8df4..cc1936b 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/routing/TestZkRoutingDataReader.java
@@ -1,4 +1,77 @@
 package org.apache.helix.zookeeper.routing;
 
-public class TestZkRoutingDataReader {
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.zookeeper.constant.TestConstants;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestZkRoutingDataReader extends ZkTestBase {
+  private static final String ROUTING_ZK_ADDR = "localhost:2358";
+  private ZkServer _routingZk;
+
+  @BeforeClass
+  public void beforeClass() {
+    // Start a separate ZK for isolation
+    _routingZk = startZkServer(ROUTING_ZK_ADDR);
+
+    // Create ZK realm routing data ZNRecord
+    ZNRecord znRecord = new ZNRecord(ROUTING_ZK_ADDR);
+    znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
+        TestConstants.TEST_KEY_LIST_1);
+
+    // Write raw routing data to ZK
+    ZkClient zkClient = _routingZk.getZkClient();
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, CreateMode.PERSISTENT);
+    zkClient
+        .create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + ROUTING_ZK_ADDR, znRecord,
+            CreateMode.PERSISTENT);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _routingZk.shutdown();
+  }
+
+  @Test
+  public void testGetRawRoutingData() {
+    Map<String, List<String>> rawRoutingData =
+        new ZkRoutingDataReader().getRawRoutingData(ROUTING_ZK_ADDR);
+
+    // Check that the returned content matches up with what we expect (1 realm, 3 keys)
+    Assert.assertEquals(rawRoutingData.size(), 1);
+    Assert.assertTrue(rawRoutingData.containsKey(ROUTING_ZK_ADDR));
+    Assert.assertEquals(rawRoutingData.get(ROUTING_ZK_ADDR), TestConstants.TEST_KEY_LIST_1);
+  }
 }