You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/07 18:04:40 UTC

[incubator-pinot] branch master updated: Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager (#3633)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c25e5ea  Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager (#3633)
c25e5ea is described below

commit c25e5eaaab2478cf2a07d5bb438ee14e784c4113
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Jan 7 10:04:35 2019 -0800

    Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager (#3633)
    
    * Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager;
    * Unit tests added.
---
 .../helix/core/PinotHelixResourceManager.java      | 35 ++++----
 .../helix/core/PinotHelixResourceManagerTest.java  | 96 ++++++++++++++++++++++
 2 files changed, 117 insertions(+), 14 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 423db8c..d359c6f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -84,6 +84,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
@@ -93,7 +94,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -128,6 +132,7 @@ public class PinotHelixResourceManager {
   private HelixAdmin _helixAdmin;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private HelixDataAccessor _helixDataAccessor;
+  private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
   private Builder _keyBuilder;
   private SegmentDeletionManager _segmentDeletionManager;
   private TableRebalancer _tableRebalancer;
@@ -168,6 +173,13 @@ public class PinotHelixResourceManager {
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
     _helixDataAccessor = _helixZkManager.getHelixDataAccessor();
+    // Cache instance zk paths.
+    BaseDataAccessor<ZNRecord> baseDataAccessor = _helixDataAccessor.getBaseDataAccessor();
+
+    String instanceConfigs = PropertyPathBuilder.instanceConfig(_helixClusterName);
+    _cacheInstanceConfigsDataAccessor =
+        new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs,
+            null, Collections.singletonList(instanceConfigs));
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -249,23 +261,17 @@ public class PinotHelixResourceManager {
    */
   @Nonnull
   public List<String> getAllInstances() {
-    return _helixAdmin.getInstancesInCluster(_helixClusterName);
-  }
-
-  /**
-   * Get key builder
-   * @return
-   */
-  @Nonnull
-  public Builder getKeyBuilder() {
-    return _keyBuilder;
+    return _cacheInstanceConfigsDataAccessor.getChildNames("/", AccessOption.PERSISTENT);
   }
 
   /**
    * Returns the config for all the Helix instances in the cluster.
    */
   public List<InstanceConfig> getAllHelixInstanceConfigs() {
-    return HelixHelper.getInstanceConfigs(_helixZkManager);
+    List<ZNRecord> znRecords = _cacheInstanceConfigsDataAccessor.getChildren("/", null, AccessOption.PERSISTENT);
+    List<InstanceConfig> instanceConfigs = new ArrayList<>(znRecords.size());
+    znRecords.forEach(znRecord -> instanceConfigs.add(new InstanceConfig(znRecord)));
+    return instanceConfigs;
   }
 
   /**
@@ -276,7 +282,8 @@ public class PinotHelixResourceManager {
    */
   @Nonnull
   public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) {
-    return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId);
+    ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" + instanceId, null, AccessOption.PERSISTENT);
+    return new InstanceConfig(znRecord);
   }
 
   /**
@@ -1519,7 +1526,7 @@ public class PinotHelixResourceManager {
 
     ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
     LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
-            rawTableName, refreshMessage, recipientCriteria);
+        rawTableName, refreshMessage, recipientCriteria);
     // Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
     int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
     if (nMsgsSent > 0) {
@@ -1529,7 +1536,7 @@ public class PinotHelixResourceManager {
       // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
       // the latest time boundary info.
       LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
-              offlineTableName, nMsgsSent);
+          offlineTableName, nMsgsSent);
     }
   }
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index f93bf73..fe0c808 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -30,9 +30,16 @@ import com.linkedin.pinot.common.utils.ZkStarter;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.ControllerRequestBuilderUtil;
 import com.linkedin.pinot.controller.helix.ControllerTest;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -47,6 +54,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   private static final String TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+  private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
+  private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
 
   private final String _helixClusterName = getHelixClusterName();
 
@@ -79,6 +88,87 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   }
 
   @Test
+  public void testGetInstanceConfigs() throws Exception {
+    Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
+    for (String server : servers) {
+      InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(server);
+      InstanceConfig realInstanceConfig = _helixAdmin.getInstanceConfig(_helixClusterName, server);
+      Assert.assertEquals(cachedInstanceConfig, realInstanceConfig);
+    }
+
+    ZkClient zkClient = new ZkClient(_helixResourceManager.getHelixZkURL(), CONNECTION_TIMEOUT_IN_MILLISECOND,
+        CONNECTION_TIMEOUT_IN_MILLISECOND, new ZNRecordSerializer());
+
+    modifyExistingInstanceConfig(zkClient);
+    addAndRemoveNewInstanceConfig(zkClient);
+
+    zkClient.close();
+  }
+
+  private void modifyExistingInstanceConfig(ZkClient zkClient) throws InterruptedException {
+    String instanceName = "Server_localhost_" + new Random().nextInt(NUM_INSTANCES);
+    String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+    Assert.assertTrue(zkClient.exists(instanceConfigPath));
+    ZNRecord znRecord = zkClient.readData(instanceConfigPath, null);
+
+    InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+    String originalPort = cachedInstanceConfig.getPort();
+    Assert.assertNotNull(originalPort);
+    String newPort = Long.toString(System.currentTimeMillis());
+    Assert.assertTrue(!newPort.equals(originalPort));
+
+    // Set new port to this instance config.
+    znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(), newPort);
+    zkClient.writeData(instanceConfigPath, znRecord);
+
+    long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+    InstanceConfig latestCachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+    String latestPort = latestCachedInstanceConfig.getPort();
+    while (!newPort.equals(latestPort) && System.currentTimeMillis() < maxTime) {
+      Thread.sleep(100L);
+      latestCachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+      latestPort = latestCachedInstanceConfig.getPort();
+    }
+    Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for adding instance config");
+
+    // Set original port back to this instance config.
+    znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(), originalPort);
+    zkClient.writeData(instanceConfigPath, znRecord);
+  }
+
+  private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws Exception {
+    int biggerRandomNumber = NUM_INSTANCES + new Random().nextInt(NUM_INSTANCES);
+    String instanceName = "Server_localhost_" + String.valueOf(biggerRandomNumber);
+    String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+    Assert.assertFalse(zkClient.exists(instanceConfigPath));
+    List<String> instances = _helixResourceManager.getAllInstances();
+    Assert.assertFalse(instances.contains(instanceName));
+
+    // Add new ZNode.
+    ZNRecord znRecord = new ZNRecord(instanceName);
+    zkClient.createPersistent(instanceConfigPath, znRecord);
+
+    List<String> latestAllInstances = _helixResourceManager.getAllInstances();
+    long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+    while (!latestAllInstances.contains(instanceName) && System.currentTimeMillis() < maxTime) {
+      Thread.sleep(100L);
+      latestAllInstances = _helixResourceManager.getAllInstances();
+    }
+    Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for adding instance config");
+
+    // Remove new ZNode.
+    zkClient.delete(instanceConfigPath);
+
+    latestAllInstances = _helixResourceManager.getAllInstances();
+    maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+    while (latestAllInstances.contains(instanceName) && System.currentTimeMillis() < maxTime) {
+      Thread.sleep(100L);
+      latestAllInstances = _helixResourceManager.getAllInstances();
+    }
+    Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for removing instance config");
+  }
+
+  @Test
   public void testRebuildBrokerResourceFromHelixTags() throws Exception {
     // Create broker tenant on 3 Brokers
     Tenant brokerTenant =
@@ -165,6 +255,12 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     }
   }
 
+  @Test
+  public void testGetDataInstanceAdminEndpoints() {
+    Set<String> fakeInstances = new HashSet<>();
+    new Random().nextInt(NUM_INSTANCES);
+  }
+
 
   @AfterClass
   public void tearDown() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org