You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/07 18:04:40 UTC
[incubator-pinot] branch master updated: Use
ZkCacheBaseDataAccessor to cache instance configs in
PinotHelixResourceManager (#3633)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c25e5ea Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager (#3633)
c25e5ea is described below
commit c25e5eaaab2478cf2a07d5bb438ee14e784c4113
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Jan 7 10:04:35 2019 -0800
Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager (#3633)
* Use ZkCacheBaseDataAccessor to cache instance configs in PinotHelixResourceManager;
* Unit tests added.
---
.../helix/core/PinotHelixResourceManager.java | 35 ++++----
.../helix/core/PinotHelixResourceManagerTest.java | 96 ++++++++++++++++++++++
2 files changed, 117 insertions(+), 14 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 423db8c..d359c6f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -84,6 +84,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
@@ -93,7 +94,10 @@ import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -128,6 +132,7 @@ public class PinotHelixResourceManager {
private HelixAdmin _helixAdmin;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private HelixDataAccessor _helixDataAccessor;
+ private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
private Builder _keyBuilder;
private SegmentDeletionManager _segmentDeletionManager;
private TableRebalancer _tableRebalancer;
@@ -168,6 +173,13 @@ public class PinotHelixResourceManager {
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
+ // Cache instance zk paths.
+ BaseDataAccessor<ZNRecord> baseDataAccessor = _helixDataAccessor.getBaseDataAccessor();
+
+ String instanceConfigs = PropertyPathBuilder.instanceConfig(_helixClusterName);
+ _cacheInstanceConfigsDataAccessor =
+ new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs,
+ null, Collections.singletonList(instanceConfigs));
_keyBuilder = _helixDataAccessor.keyBuilder();
_segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -249,23 +261,17 @@ public class PinotHelixResourceManager {
*/
@Nonnull
public List<String> getAllInstances() {
- return _helixAdmin.getInstancesInCluster(_helixClusterName);
- }
-
- /**
- * Get key builder
- * @return
- */
- @Nonnull
- public Builder getKeyBuilder() {
- return _keyBuilder;
+ return _cacheInstanceConfigsDataAccessor.getChildNames("/", AccessOption.PERSISTENT);
}
/**
* Returns the config for all the Helix instances in the cluster.
*/
public List<InstanceConfig> getAllHelixInstanceConfigs() {
- return HelixHelper.getInstanceConfigs(_helixZkManager);
+ List<ZNRecord> znRecords = _cacheInstanceConfigsDataAccessor.getChildren("/", null, AccessOption.PERSISTENT);
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(znRecords.size());
+ znRecords.forEach(znRecord -> instanceConfigs.add(new InstanceConfig(znRecord)));
+ return instanceConfigs;
}
/**
@@ -276,7 +282,8 @@ public class PinotHelixResourceManager {
*/
@Nonnull
public InstanceConfig getHelixInstanceConfig(@Nonnull String instanceId) {
- return _helixAdmin.getInstanceConfig(_helixClusterName, instanceId);
+ ZNRecord znRecord = _cacheInstanceConfigsDataAccessor.get("/" + instanceId, null, AccessOption.PERSISTENT);
+ return new InstanceConfig(znRecord);
}
/**
@@ -1519,7 +1526,7 @@ public class PinotHelixResourceManager {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
- rawTableName, refreshMessage, recipientCriteria);
+ rawTableName, refreshMessage, recipientCriteria);
// Helix sets the timeoutMs argument specified in 'send' call as the processing timeout of the message.
int nMsgsSent = messagingService.send(recipientCriteria, refreshMessage, null, timeoutMs);
if (nMsgsSent > 0) {
@@ -1529,7 +1536,7 @@ public class PinotHelixResourceManager {
// May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
// the latest time boundary info.
LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
- offlineTableName, nMsgsSent);
+ offlineTableName, nMsgsSent);
}
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index f93bf73..fe0c808 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -30,9 +30,16 @@ import com.linkedin.pinot.common.utils.ZkStarter;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.ControllerRequestBuilderUtil;
import com.linkedin.pinot.controller.helix.ControllerTest;
+import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -47,6 +54,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
private static final String TABLE_NAME = "testTable";
private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+ private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
+ private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
private final String _helixClusterName = getHelixClusterName();
@@ -79,6 +88,87 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
@Test
+ public void testGetInstanceConfigs() throws Exception {
+ Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
+ for (String server : servers) {
+ InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(server);
+ InstanceConfig realInstanceConfig = _helixAdmin.getInstanceConfig(_helixClusterName, server);
+ Assert.assertEquals(cachedInstanceConfig, realInstanceConfig);
+ }
+
+ ZkClient zkClient = new ZkClient(_helixResourceManager.getHelixZkURL(), CONNECTION_TIMEOUT_IN_MILLISECOND,
+ CONNECTION_TIMEOUT_IN_MILLISECOND, new ZNRecordSerializer());
+
+ modifyExistingInstanceConfig(zkClient);
+ addAndRemoveNewInstanceConfig(zkClient);
+
+ zkClient.close();
+ }
+
+ private void modifyExistingInstanceConfig(ZkClient zkClient) throws InterruptedException {
+ String instanceName = "Server_localhost_" + new Random().nextInt(NUM_INSTANCES);
+ String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ Assert.assertTrue(zkClient.exists(instanceConfigPath));
+ ZNRecord znRecord = zkClient.readData(instanceConfigPath, null);
+
+ InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+ String originalPort = cachedInstanceConfig.getPort();
+ Assert.assertNotNull(originalPort);
+ String newPort = Long.toString(System.currentTimeMillis());
+ Assert.assertTrue(!newPort.equals(originalPort));
+
+ // Set new port to this instance config.
+ znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(), newPort);
+ zkClient.writeData(instanceConfigPath, znRecord);
+
+ long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ InstanceConfig latestCachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+ String latestPort = latestCachedInstanceConfig.getPort();
+ while (!newPort.equals(latestPort) && System.currentTimeMillis() < maxTime) {
+ Thread.sleep(100L);
+ latestCachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(instanceName);
+ latestPort = latestCachedInstanceConfig.getPort();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for adding instance config");
+
+ // Set original port back to this instance config.
+ znRecord.setSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_PORT.toString(), originalPort);
+ zkClient.writeData(instanceConfigPath, znRecord);
+ }
+
+ private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws Exception {
+ int biggerRandomNumber = NUM_INSTANCES + new Random().nextInt(NUM_INSTANCES);
+ String instanceName = "Server_localhost_" + String.valueOf(biggerRandomNumber);
+ String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+ Assert.assertFalse(zkClient.exists(instanceConfigPath));
+ List<String> instances = _helixResourceManager.getAllInstances();
+ Assert.assertFalse(instances.contains(instanceName));
+
+ // Add new ZNode.
+ ZNRecord znRecord = new ZNRecord(instanceName);
+ zkClient.createPersistent(instanceConfigPath, znRecord);
+
+ List<String> latestAllInstances = _helixResourceManager.getAllInstances();
+ long maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ while (!latestAllInstances.contains(instanceName) && System.currentTimeMillis() < maxTime) {
+ Thread.sleep(100L);
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for adding instance config");
+
+ // Remove new ZNode.
+ zkClient.delete(instanceConfigPath);
+
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ maxTime = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
+ while (latestAllInstances.contains(instanceName) && System.currentTimeMillis() < maxTime) {
+ Thread.sleep(100L);
+ latestAllInstances = _helixResourceManager.getAllInstances();
+ }
+ Assert.assertTrue(System.currentTimeMillis() < maxTime, "Timeout when waiting for removing instance config");
+ }
+
+ @Test
public void testRebuildBrokerResourceFromHelixTags() throws Exception {
// Create broker tenant on 3 Brokers
Tenant brokerTenant =
@@ -165,6 +255,12 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
}
+ @Test
+ public void testGetDataInstanceAdminEndpoints() {
+ Set<String> fakeInstances = new HashSet<>();
+ new Random().nextInt(NUM_INSTANCES);
+ }
+
@AfterClass
public void tearDown() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org