You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/03 00:20:50 UTC
[incubator-pinot] 01/01: Use ZkCacheBaseDataAccessor to cache
instance configs in PinotHelixResourceManager
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch use-zk-cache-base-data-accessor-to-get-instance-config
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 9c239e9c580d7d96c6dc12c63866baf6d5ebbad2
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jan 2 16:20:19 2019 -0800
Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager
---
.../helix/core/PinotHelixResourceManager.java | 53 ++++++++++++++++++----
.../controller/helix/PinotResourceManagerTest.java | 10 ++++
2 files changed, 53 insertions(+), 10 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 423db8c..f3e936f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -84,6 +84,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
@@ -93,7 +94,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -128,6 +132,7 @@ public class PinotHelixResourceManager {
private HelixAdmin _helixAdmin;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private HelixDataAccessor _helixDataAccessor;
+ private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
private Builder _keyBuilder;
private SegmentDeletionManager _segmentDeletionManager;
private TableRebalancer _tableRebalancer;
@@ -168,6 +173,16 @@ public class PinotHelixResourceManager {
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
+ // Cache instance zk paths.
+ BaseDataAccessor<ZNRecord> baseDataAccessor = _helixDataAccessor.getBaseDataAccessor();
+ if (baseDataAccessor instanceof ZkBaseDataAccessor) {
+ String instanceConfigs = PropertyPathBuilder.instanceConfig(_helixClusterName);
+ _cacheInstanceConfigsDataAccessor =
+ new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs,
+ Collections.singletonList(instanceConfigs), Collections.singletonList(instanceConfigs));
+ } else {
+ _cacheInstanceConfigsDataAccessor = null;
+ }
_keyBuilder = _helixDataAccessor.keyBuilder();
_segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -261,6 +276,10 @@ public class PinotHelixResourceManager {
return _keyBuilder;
}
+ public ZkCacheBaseDataAccessor<ZNRecord> getCacheInstanceConfigsDataAccessor() {
+ return _cacheInstanceConfigsDataAccessor;
+ }
+
/**
* Returns the config for all the Helix instances in the cluster.
*/
@@ -276,6 +295,11 @@ public class PinotHelixResourceManager {
*/
@Nonnull
public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) {
+ if (_cacheInstanceConfigsDataAccessor != null) {
+ LOGGER.info("Get instance config for instance {} from cluster {}.", instanceId, _helixClusterName);
+ ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" + instanceId, null, AccessOption.PERSISTENT);
+ return new InstanceConfig(znRecord);
+ }
return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId);
}
@@ -1519,7 +1543,7 @@ public class PinotHelixResourceManager {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
- rawTableName, refreshMessage, recipientCriteria);
+ rawTableName, refreshMessage, recipientCriteria);
// Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
if (nMsgsSent > 0) {
@@ -1529,7 +1553,7 @@ public class PinotHelixResourceManager {
// May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
// the latest time boundary info.
LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
- offlineTableName, nMsgsSent);
+ offlineTableName, nMsgsSent);
}
}
@@ -2189,28 +2213,37 @@ public class PinotHelixResourceManager {
/*
* Uncomment and use for testing on a real cluster
+ * */
public static void main(String[] args) throws Exception {
final String testZk = "test1.zk.com:12345/pinot-cluster";
- final String realZk = "test2.zk.com:12345/pinot-cluster";
+ final String realZk = "zk-lca1-pinot.stg.linkedin.com:12913/pinot-cluster";
final String zkURL = realZk;
- final String clusterName = "mpSprintDemoCluster";
+ final String clusterName = "pinot";
final String helixClusterName = clusterName;
final String controllerInstanceId = "local-hostname";
final String localDiskDir = "/var/tmp/Controller";
final long externalViewOnlineToOfflineTimeoutMillis = 100L;
final boolean isSingleTenantCluster = false;
final boolean isUpdateStateModel = false;
- MetricsRegistry metricsRegistry = new MetricsRegistry();
+// MetricsRegistry metricsRegistry = new MetricsRegistry();
final boolean dryRun = true;
final String tableName = "testTable";
final TableType tableType = TableType.OFFLINE;
PinotHelixResourceManager helixResourceManager =
new PinotHelixResourceManager(zkURL, helixClusterName, controllerInstanceId, localDiskDir,
- externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel);
+ externalViewOnlineToOfflineTimeoutMillis, isSingleTenantCluster, isUpdateStateModel, false);
helixResourceManager.start();
- ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType);
- ObjectMapper mapper = new ObjectMapper();
- System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record));
+
+ Set<String> instances = new HashSet<>();
+ instances.add("Server_lca1-app0168.stg.linkedin.com_8001");
+ instances.add("Server_lca1-app0170.stg.linkedin.com_8001");
+ BiMap<String, String> map = helixResourceManager.getDataInstanceAdminEndpoints(instances);
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ System.out.println(entry.getKey() + " " + entry.getValue());
+ }
+ System.out.println("~~~");
+// ZNRecord record = helixResourceManager.rebalanceTable(tableName, dryRun, tableType);
+// ObjectMapper mapper = new ObjectMapper();
+// System.out.println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(record));
}
- */
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
index e3c59d4..08c7bc6 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/PinotResourceManagerTest.java
@@ -26,9 +26,12 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -66,6 +69,13 @@ public class PinotResourceManagerTest {
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(),
1);
+ // Verifying instance configs.
+ ZNRecord znRecord = _pinotHelixResourceManager.getCacheInstanceConfigsDataAccessor().get("/Server_localhost_0", null, AccessOption.PERSISTENT);
+ Assert.assertNotNull(znRecord);
+ Assert.assertEquals("Server_localhost_0", znRecord.getId());
+ InstanceConfig instanceConfig = new InstanceConfig(znRecord);
+ Assert.assertEquals("Server_localhost", instanceConfig.getHostName());
+
// Adding table
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org