You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/03 00:20:50 UTC

[incubator-pinot] 01/01: Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch use-zk-cache-base-data-accessor-to-get-instance-config
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 9c239e9c580d7d96c6dc12c63866baf6d5ebbad2
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jan 2 16:20:19 2019 -0800

    Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager
---
 .../helix/core/PinotHelixResourceManager.java      | 53 ++++++++++++++++++----
 .../controller/helix/PinotResourceManagerTest.java | 10 ++++
 2 files changed, 53 insertions(+), 10 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 423db8c..f3e936f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -84,6 +84,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixAdmin;
@@ -93,7 +94,10 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -128,6 +132,7 @@ public class PinotHelixResourceManager {
   private HelixAdmin _helixAdmin;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private HelixDataAccessor _helixDataAccessor;
+  private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
   private Builder _keyBuilder;
   private SegmentDeletionManager _segmentDeletionManager;
   private TableRebalancer _tableRebalancer;
@@ -168,6 +173,16 @@ public class PinotHelixResourceManager {
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
     _helixDataAccessor = _helixZkManager.getHelixDataAccessor();
+    // Cache instance zk paths.
+    BaseDataAccessor<ZNRecord> baseDataAccessor = _helixDataAccessor.getBaseDataAccessor();
+    if (baseDataAccessor instanceof ZkBaseDataAccessor) {
+      String instanceConfigs = PropertyPathBuilder.instanceConfig(_helixClusterName);
+      _cacheInstanceConfigsDataAccessor =
+          new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs,
+              Collections.singletonList(instanceConfigs), Collections.singletonList(instanceConfigs));
+    } else {
+      _cacheInstanceConfigsDataAccessor = null;
+    }
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -261,6 +276,10 @@ public class PinotHelixResourceManager {
     return _keyBuilder;
   }
 
+  public ZkCacheBaseDataAccessor<ZNRecord> getCacheInstanceConfigsDataAccessor() {
+    return _cacheInstanceConfigsDataAccessor;
+  }
+
   /**
    * Returns the config for all the Helix instances in the cluster.
    */
@@ -276,6 +295,11 @@ public class PinotHelixResourceManager {
    */
   @Nonnull
   public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) {
+    if (_cacheInstanceConfigsDataAccessor != null) {
+      LOGGER.info("Get instance config for instance {} from cluster {}.", instanceId, _helixClusterName);
+      ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" + instanceId, null, AccessOption.PERSISTENT);
+      return new InstanceConfig(znRecord);
+    }
     return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId);
   }
 
@@ -1519,7 +1543,7 @@ public class PinotHelixResourceManager {
 
     ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
     LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
-            rawTableName, refreshMessage, recipientCriteria);
+        rawTableName, refreshMessage, recipientCriteria);
     // Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
     int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
     if (nMsgsSent > 0) {
@@ -1529,7 +1553,7 @@ public class PinotHelixResourceManager {
       // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
       // the latest time boundary info.
       LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
-              offlineTableName, nMsgsSent);
+          offlineTableName, nMsgsSent);
     }
   }
 
@@ -2189,28 +2213,37 @@ public class PinotHelixResourceManager {
 
   /*
    * Uncomment and use for testing on a real cluster
+   *  */
   public static void main(String[] args) throws Exception {
     final String testZk = "test1.zk.com:12345/pinot-cluster";
-    final String realZk = "test2.zk.com:12345/pinot-cluster";
+    final String realZk = "zk-lca1-pinot.stg.linkedin.com:12913/pinot-cluster";
     final String zkURL = realZk;
-    final String clusterName = "mpSprintDemoCluster";
+    final String clusterName = "pinot";
     final String helixClusterName = clusterName;
     final String controllerInstanceId = "local-hostname";
     final String localDiskDir = "/var/tmp/Controller";
     final long externalViewOnlineToOfflineTimeoutMillis = 100L;
     final boolean isSingleTenantCluster = false;
     final boolean isUpdateStateModel = false;
-    MetricsRegistry metricsRegistry = new MetricsRegistry();
+//    MetricsRegistry metricsRegistry = new MetricsRegistry();
     final boolean dryRun = true;
     final String tableName = "testTable";
     final TableType tableType = TableType.OFFLINE;
     PinotHelixResourceManager helixResourceManager =
         new PinotHelixResourceManager(zkURL, helixClusterName, controllerInstanceId, localDiskDir,
-            externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel);
+            externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel, false);
     helixResourceManager.start();
-    ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType);
-    ObjectMapper mapper = new ObjectMapper();
-    System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record));
+
+    Set<String> instances = new HashSet<>();
+    instances.add("Server_lca1-app0168.stg.linkedin.com_8001");
+    instances.add("Server_lca1-app0170.stg.linkedin.com_8001");
+    BiMap<String, String> map = helixResourceManager.getDataInstanceAdminEndpoints(instances);
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      System.out.println(entry.getKey() + " " + entry.getValue());
+    }
+    System.out.println("~~~");
+//    ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType);
+//    ObjectMapper mapper = new ObjectMapper();
+//    System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record));
   }
-   */
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
index e3c59d4..08c7bc6 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
@@ -26,9 +26,12 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -66,6 +69,13 @@ public class PinotResourceManagerTest {
     Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(),
         1);
 
+    // Verifying instance configs.
+    ZNRecord znRecord = _pinotHelixResourceManager.getCacheInstanceConfigsDataAccessor().get("/Server_localhost_0", null, AccessOption.PERSISTENT);
+    Assert.assertNotNull(znRecord);
+    Assert.assertEquals("Server_localhost_0", znRecord.getId());
+    InstanceConfig instanceConfig = new InstanceConfig(znRecord);
+    Assert.assertEquals("Server_localhost", instanceConfig.getHostName());
+
     // Adding table
     TableConfig tableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org